AWS Audit Log 알람 받기
클라우드를 잘 운영하기 위해서는 감사 로깅도 굉장히 중요한데요, 어드민 권한을 가지고 있지 않은 일반 사용자에게 많은 권한이 부여되어 있을수록 감사 로깅의 필요성은 더욱 증가하게 됩니다. 본 게시글에서는 AWS에서의 모든 활성화된 사용자들이 하는 액션에 대해서 슬랙으로 알람이 오도록 설정하려고 합니다.
아키텍처를 먼저 간단하게 살펴보자면 EventBridge 서비스의 규칙을 설정한 뒤에 이를 트리거로 하여 슬랙으로 알람이 발생하게 되는 구조입니다.
아키텍처
제가 만든 람다는 Python 3.12 런타임 환경을 사용하며 현재 모든 서비스는 ap-northeast-2 (서울)리전만을 사용하므로 람다도 서울 리전에 만들고 파이썬의 requests 라이브러리를 AWS의 계층(layer)에 추가하였어요. 트리거로는 EventBridge를 사용해 사용하는 서비스만을 추가했어요.
콘솔에서 보면 EventBridge 규칙만 25개네요.
규칙 중 하나의 이벤트 패턴은 json 형태로 작성하며 아래와 같이 특정 이벤트 없이 모든 이벤트에 대한 알람을 받고 싶을 경우 eventName 없이 작성하실 수 있어요. 하지만 이렇게 설정할 경우 빈번하게 알람이 발생할 수 있어 특정 이벤트에 대해서만 알람을 받기를 원하시는 경우 eventName에 특정 이벤트를 추가하면 됩니다.
{
"source": ["aws.acm"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["acm.amazonaws.com"]
}
}
아래는 ECS 서비스를 예로 들었는데, ECS 서비스 중에서도 클러스터를 생성 또는 서비스를 생성할 때 등 eventName에 나열된 이벤트가 수행될 경우 트리거되어 알람이 발생하게 됩니다.
{
"source": ["aws.ecs"],
"detail-type": ["AWS API Call via CloudTrail"],
"detail": {
"eventSource": ["ecs.amazonaws.com"],
"eventName": [
"CreateCluster",
"CreateService",
"CreateTaskSet",
"DeleteAccountSetting",
"DeleteAttributes",
"DeleteCapacityProvider",
"DeleteCluster",
"DeleteService",
"DeregisterContainerInstance",
"DeregisterTaskDefinition",
"DeleteTaskDefinitions",
"DeleteTaskSet",
"RunTask",
"StartTask",
"StopTask"
]
}
}
eventName은 AWS의 공식 문서 중 서비스별로 정리되어 있는 API Reference로 들어가 Actions에서 찾으실 수 있어요. EC2의 액션 리스트는 에서 보실 수 있어요.
- AWS의 각 서비스 Documentation으로 이동
- API Reference
- Actions에서 EventName 조회
람다 코드를 작성하기 전에 트리거될 때 발생하는 예시 이벤트를 추출하기 위해서 sam CLI 도구를 사용해서 s3가 삭제될 때 발생하는 이벤트를 json으로 추출한 뒤에 이 json을 event 파라미터에 넣어 코드가 잘 실행되는지 테스트합니다. 테스트 코드는 에서 확인하실 수 있어요. 람다에 사용한 코드는 아래와 같습니다.
import datetime
import os
import json
import boto3
import requests
from dateutil import relativedelta
SLACK_WEBHOOK_URL = os.environ["SLACK_WEBHOOK_URL"]
def lambda_handler(event, context):
data = event['detail']
user_identity = data['userIdentity']
event_time = data['eventTime']
group = user_identity['arn'].split("/")[1].split("_")[1]
user = user_identity['arn'].split("/")[2]
event_id = data['eventID']
event_source = data['eventSource']
event_name = data['eventName']
aws_region = data['awsRegion']
client_ip = data['sourceIPAddress']
request_parameters = data['requestParameters']
text = f"{user}님이 {client_ip}에서 {event_name}을(를) 수행했습니다."
message_blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*{text}*"
}
},
{
"type": "section",
"fields": [
{
"type": "mrkdwn",
"text": f"*Event ID:*\n```{event_id}```"
},
{
"type": "mrkdwn",
"text": f"*Event Source:*\n```{event_source}```"
},
{
"type": "mrkdwn",
"text": f"*Event Name:*\n```{event_name}```"
},
{
"type": "mrkdwn",
"text": f"*AWS Region:*\n```{aws_region}```"
},
{
"type": "mrkdwn",
"text": f"*Client IP:*\n```{client_ip}```"
},
{
"type": "mrkdwn",
"text": f"*Event Time:*\n```{event_time}```"
},
{
"type": "mrkdwn",
"text": f"*User Group:*\n```{group}```"
},
{
"type": "mrkdwn",
"text": f"*User:*\n```{user}```"
}
]
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"*Request Parameters:*\n```{json.dumps(request_parameters, indent=2)}```"
}
}
]
try:
payload = {
"blocks": message_blocks
}
headers = {'Content-type': 'application/json'}
response = requests.post(
SLACK_WEBHOOK_URL,
data=json.dumps(payload),
headers=headers)
# Logging response for debugging
print(f"Response status: {response.status_code}")
print(f"Response text: {response.text}")
return {
'statusCode': response.status_code,
'body': response.text
}
except Exception as e:
print(f"Error: {e}")
return {
'statusCode': 500,
'body': str(e)
}
람다를 배포하고 난뒤에 사용자의 액션에 따라서 슬랙으로 알람이 정상적으로 동작하는 모습입니다. 최대한 모든 이벤트 발생 결과를 하나의 형식으로 만들기 위해서 request parameters를 포맷에 포함하였고 변경 되는 부분이 있을경우 JSON 형식으로 세부적인 액션 내용에 대해서 알려주고 있어요.