사용목적
현재 이메일 분류 관련 프로젝트를 진행하고 있습니다. 이메일을 분류하는 AI 모델이 있는데, 문제는 분류에 1개의 메일 당 2초 이상 걸리게 되었습니다. 사용자가 요청했을 때 실시간으로 분류하기에 메일 당 2초도 너무 긴 시간이고, 실시간으로 서비스를 제공하지 않더라도, batch 작업에서 병목이 생기기엔 충분했습니다.
처음에는 그냥 lambda에 cron 작업을 걸어서 돌릴 생각이었지만, 유저가 1명 가입할 때마다 평균 3000개 이상의 이메일 분류가 필요한데, lambda에 최대 실행 시간이 15분으로 정해져있고, 여러 경로에서 이메일 분류를 요청하는 상황에서, 분류부터 DB 저장까지 안정적인 처리가 필요했습니다.
규칙적으로 분류가 안된 email_id를 찾아서 SQS에 넘겨주면, Consumer가 DB에서 email_id에 대한 내용을 조회하고, 분류를 한 뒤 그 결과를 RDS에 update 하는 방식이며, Spring 백엔드에서는 유저의 최초가입 등 특수한 상황에서 간헐적으로 SQS에 분류 요청을 보내게 됩니다.
실행환경
lambda : Python 3.12, arm64
SQS : standard
구성과정
1. Consumer Lambda 생성
- 이때 SQS 권한을 넣어줘야, 나중에 따로 IAM을 안만져도 됩니다. 그리고 새 역할을 만들어야 cloudwatch 등 lambda 필수 권한들도 default로 들어가게 됩니다.
기본적으로 SQS에서 주는 message는 아래와 같은 형식을 가집니다.
{
"Records": [
{
"messageId": "059f36b4-87a3-44ab-83d2-661975830a7d",
"receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...",
"body": "message123!@#",
"attributes": {
"ApproximateReceiveCount": "1",
"SentTimestamp": "1545082649183",
"SenderId": "AIDAIENQZJOLO23YVJ4VO",
"ApproximateFirstReceiveTimestamp": "1545082649185"
},
"messageAttributes": {},
"md5OfBody": "098f6bcd4621d373cade4e832627b4f6",
"eventSource": "aws:sqs",
"eventSourceARN": "arn:aws:sqs:us-east-1:111122223333:my-queue",
"awsRegion": "us-east-1"
}
]
}
아래와 같이 handler 함수를 만들면 body의 message123!@# 부분을 읽어올 수 있습니다.
def lambda_handler(event, context):
records = event['Records']
for record in records:
recordBody = json.loads(record['body'])
2. SQS 생성
새로운 SQS를 만들어줍니다. 본인은 선입선출이 보장될 필요는 없어서, Standard Queue를 사용했습니다. 요금이나 각각의 장단점이 존재하니 각자 도메인에 맞는 queue를 선택하면 됩니다.
3. SQS 트리거 추가
lambda로 돌아와 '트리거추가'를 해줍니다. SQS를 고르고 (2)에서 만든 SQS 대기열을 선택해줍니다.
잘 연결되었는지 확인해보기 위해 테스트 메시지를 송신해보겠습니다.
생성된 SQS 메뉴를 보면, 웹 콘솔에서 메시지를 생성할 수 있습니다.
아무거나 입력하고 메시지를 전송해봅니다.
4. Producer lambda 생성
메시지를 넣어줄 producer를 만들어줍니다. consumer를 만들때와 동일하게 lambda를 생성해주면됩니다.
def lambda_handler(event, context):
...
for idx in range(0, len(lst), 10):
result = {'email_id': lst[idx:idx+10]}
msg_body = json.dumps(result)
msg = send_sqs_message(os.environ['SQS_QUEUE'], msg_body)
def send_sqs_message(sqs_queue_url, msg_body):
sqs_client = boto3.client('sqs')
msg = sqs_client.send_message(QueueUrl=sqs_queue_url, MessageBody=msg_body)
return msg
파이썬 코드는 다음과 같습니다. boto3 라이브러리를 이용해서 쉽게 SQS에 메시지를 보낼 수 있습니다.
아까 생성한 SQS_QUEUE의 URL을 lambda 환경변수에 추가해줘야 합니다.
SQS Permission Error
임의의 테스트를 만들어서 Producer에서 SQS로 메시지를 보냈지만 permission 에러가 났습니다.
IAM에서 Producer Lambda를 만들 때 생성된 role을 찾아줍니다.
저는 SQSFullAccess 권한을 넣어주도록 하겠습니다. 엄격하게 Role을 관리하고 싶다면, 아마 Full까진 아니더라도 Send 권한 등 일부만 넣어줄 수 있습니다.
5. Amazon EventBridge 일정 생성
마지막으로 Producer를 일정한 주기마다 호출해주기 위해서 EventBridge 일정을 생성해줍니다.
일정 생성방법은 공식 사이트나 블로그에서 자세히 설명된 글이 많으니 생략하겠습니다. 각자 도메인 기호에 맞게 생성해주면 됩니다.
Cloudwatch에서 lambda의 output 값을 모니터링 해보면 정상적으로 동작하는 것을 알 수 있습니다.
참고 문서
- https://devblog.kakaostyle.com/ko/2017-05-13-1-aws-serverless-1/
- https://www.youtube.com/watch?v=xyHLX1dUwuA
- https://docs.aws.amazon.com/ko_kr/code-library/latest/ug/python_3_sqs_code_examples.html
- https://docs.aws.amazon.com/ko_kr/lambda/latest/dg/with-sqs.html