TokyoAJ

도쿄아재

AWS 2025.09.22

AWS SES 수신 → S3 저장 → Lambda 파싱 → DB 적재: 실전 구축 가이드

SES로 수신한 메일을 S3(.eml) 에 저장하고, Lambda가 이를 파싱해 MariaDB에 넣는 엔드투엔드 파이프라인.

이 글에는 DB 스키마, IAM 권한, 환경변수 예시, 완전한 Lambda 코드가 포함돼 있어 바로 적용할 수 있습니다.


아키텍처 한눈에 보기

외부 발신자 ──▶ SES Inbound
Receipt Rule
├─ S3 Action → s3://<INBOUND_BUCKET>/<prefix>/<messageId>.eml
└─ Lambda Action ──▶ S3의 .eml 읽기 → MIME 파싱 → DB INSERT
  1. 키 포인트: Receipt Rule에서 S3 → Lambda 순으로 액션을 배치하면, Lambda는 이벤트의 ses.mail.messageId를 키로 사용해 S3에서 원문(.eml)을 읽어올 수 있습니다. 버킷명/프리픽스는 환경변수로 주입하세요.


준비 체크리스트

  1. 수신 도메인/주소 Verify 후 MX를 SES 인바운드로 설정
  2. Receipt Rule 생성: 조건(Recipient) → S3 Action(버킷/프리픽스) → Lambda Action
  3. S3 버킷에 SES PutObject 허용(버킷 정책)
  4. Lambda 실행 역할에 S3 GetObject 권한(아래 JSON)
  5. Lambda 환경변수 설정(INBOUND_BUCKET, INBOUND_PREFIX)
  6. MariaDB(또는 Aurora/RDS) 준비 및 DDL 적용


DB 스키마 (MariaDB)

CREATE TABLE email_inbound (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
message_id VARCHAR(255) NOT NULL,
mail_from VARCHAR(320) NOT NULL,
rcpt_to VARCHAR(1024) NOT NULL,
subject VARCHAR(512),
date_header VARCHAR(128), -- 원문 값 그대로 보관(다양한 포맷 대비)
text_body MEDIUMTEXT,
html_body MEDIUMTEXT,
attachments JSON NULL,
s3_bucket VARCHAR(255) NOT NULL,
s3_object_key VARCHAR(1024) NOT NULL,
received_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
INDEX idx_msgid (message_id),
INDEX idx_received (received_at)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

팁: 첨부 파일 자체는 DB에 넣지 말고 S3에 보관, DB에는 메타데이터만 저장하세요.


Lambda 역할(IAM) — S3 읽기 권한 추가

IAM 콘솔 → Rolesktw_email_receiving-role-ocxoktbgAdd permissionsCreate inline policy 에 아래 JSON을 저장하세요(버킷명 교체).

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:GetObjectVersion"
],
"Resource": "arn:aws:s3:::buzzit-mail/*"
},
{
"Effect": "Allow",
"Action": "s3:ListBucket",
"Resource": "arn:aws:s3:::buzzit-mail"
}
]
}

S3 객체가 SSE-KMS로 암호화되어 있다면, 동일 역할에 kms:Decrypt 권한도 추가하세요.


Lambda 환경변수 예시

INBOUND_BUCKET=your-ses-inbound-bucket
INBOUND_PREFIX=emails # (선택, 비워도 됨. S3 Action prefix와 동일하게)
DB_HOST=your-db.host.rds.amazonaws.com
DB_PORT=3306
DB_USER=app_user
DB_PASS=********
DB_NAME=maildb
LOG_BODY_LIMIT=2000


Lambda 코드 (Python 3.13)

의존성: PyMySQL 필요 → 배포 ZIP에 포함하거나 Lambda Layer로 제공하세요.

  1. ZIP 번들링 예: pip install PyMySQL -t .lambda_function.py와 함께 압축
  2. Layer 예: python/ 하위에 site-packages 생성해 게시 후 함수에 연결
import os
import json
import logging
import boto3
import pymysql
from email import policy
from email.parser import BytesParser

logger = logging.getLogger()
logger.setLevel(logging.INFO)

S3 = boto3.client('s3')

# DB 연결 정보
DB_HOST = os.environ['DB_HOST']
DB_PORT = int(os.environ.get('DB_PORT', '3306'))
DB_USER = os.environ['DB_USER']
DB_PASS = os.environ['DB_PASS']
DB_NAME = os.environ['DB_NAME']

# 본문 로그 최대 길이(과도한 로그/비용 방지). 필요시 환경변수로 조절
MAX_LOG_CHARS = int(os.getenv("LOG_BODY_LIMIT", "2000"))

def _truncate(text: str | None, limit: int) -> str:
if not text:
return ""
return text if len(text) <= limit else (text[:limit] + "…(truncated)")

def _extract_bodies_and_attachments(msg):
"""text/plain, text/html 본문과 첨부파일 정보 추출"""
text_body, html_body = None, None
attachments = []

if msg.is_multipart():
for part in msg.walk():
disp = (part.get_content_disposition() or "").lower()
ctype = part.get_content_type().lower()

if disp == "attachment":
payload = part.get_payload(decode=True) or b''
attachments.append({
"filename": part.get_filename(),
"contentType": ctype,
"size": len(payload)
})
elif ctype == "text/plain" and text_body is None:
text_body = part.get_content()
elif ctype == "text/html" and html_body is None:
html_body = part.get_content()
else:
ctype = msg.get_content_type().lower()
body = msg.get_content()
if ctype == "text/html":
html_body = body
else:
text_body = body

return text_body, html_body, attachments

def insert_email_to_db(email_data, s3_bucket, s3_key):
"""이메일 데이터를 MariaDB에 삽입"""
try:
conn = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
database=DB_NAME,
charset='utf8mb4'
)
with conn.cursor() as cur:
sql = """
INSERT INTO email_inbound
(message_id, mail_from, rcpt_to, subject, date_header,
text_body, html_body, attachments, s3_bucket, s3_object_key)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
cur.execute(sql, (
email_data['message_id'],
email_data['mail_from'],
email_data['rcpt_to'],
email_data['subject'],
email_data['date_header'],
email_data['text_body'],
email_data['html_body'],
json.dumps(email_data['attachments'], ensure_ascii=False),
s3_bucket,
s3_key
))
conn.commit()
logger.info("DB INSERT OK | message_id=%s", email_data['message_id'])
except Exception as e:
logger.error("DB INSERT FAILED | error=%s", str(e))
raise
finally:
if 'conn' in locals():
conn.close()

def lambda_handler(event, context):
# SES Receipt Rule의 Lambda 액션 이벤트인지 확인
if not ('Records' in event and 'ses' in event['Records'][0]):
logger.error("Not SES->Lambda event")
return {"ok": False, "error": "not_ses_lambda_event"}

ses_rec = event['Records'][0]['ses']

# 버킷/프리픽스는 환경변수에서, 키는 messageId 사용 (S3 액션과 동일한 prefix로 맞추세요)
bucket = os.getenv("INBOUND_BUCKET")
prefix = os.getenv("INBOUND_PREFIX", "")
if prefix and not prefix.endswith('/'):
prefix += '/'

if not bucket:
logger.error("INBOUND_BUCKET not set. Set Lambda env var INBOUND_BUCKET.")
return {"ok": False, "error": "INBOUND_BUCKET not set"}

message_id = ses_rec['mail']['messageId']
key = f"{prefix}{message_id}"

# S3 정보 로그
logger.info("S3 DEBUG | bucket=%s | key=%s | uri=s3://%s/%s", bucket, key, bucket, key)

# .eml 로드
obj = S3.get_object(Bucket=bucket, Key=key)
raw = obj['Body'].read()
logger.info("S3 GET OK | size_bytes=%s | eTag=%s", obj.get('ContentLength'), obj.get('ETag'))

# MIME 파싱
msg = BytesParser(policy=policy.default).parsebytes(raw)

# 헤더/본문/첨부파일 추출
subject = msg.get('subject', '')
mail_from = msg.get('from', '')
rcpt_to = msg.get_all('to', []) or []
cc = msg.get_all('cc', []) or []
date_hdr = msg.get('date', '')

text_body, html_body, attachments = _extract_bodies_and_attachments(msg)

# 메타 로그
logger.info("EMAIL META | subject=%r | from=%s | to=%s | cc=%s | date=%s",
subject, mail_from, ", ".join(rcpt_to), ", ".join(cc), date_hdr)

# 본문(디버그용, 길이 제한)
logger.info("EMAIL TEXT (<=%d chars): %s", MAX_LOG_CHARS, _truncate(text_body, MAX_LOG_CHARS))
logger.info("EMAIL HTML (<=%d chars): %s", MAX_LOG_CHARS, _truncate(html_body, MAX_LOG_CHARS))
logger.info("ATTACHMENTS COUNT: %d", len(attachments))

# 이메일 데이터 준비
email_data = {
"subject": subject,
"message_id": message_id,
"mail_from": mail_from,
"rcpt_to": ", ".join(rcpt_to),
"date_header": date_hdr,
"text_body": text_body,
"html_body": html_body,
"attachments": attachments
}

# MariaDB에 삽입
try:
insert_email_to_db(email_data, bucket, key)
except Exception as e:
logger.error("Failed to insert email to database: %s", str(e))
return {
"ok": False,
"error": "database_insert_failed",
"bucket": bucket,
"key": key,
"messageId": message_id
}

return {
"ok": True,
"bucket": bucket,
"key": key,
"messageId": message_id,
"database_inserted": True
}


운영 팁 & 트러블슈팅

AccessDenied: s3:GetObject

→ Lambda 역할에 S3 읽기 권한이 없는 경우(위 IAM JSON 적용).

→ KMS 사용 시 kms:Decrypt도 추가.

No module named 'pymysql'

→ 배포 ZIP/Lambda Layer에 PyMySQL 포함. (런타임 버전과 동일한 파이썬으로 빌드)

키 불일치

→ Receipt Rule의 S3 Prefix 와 INBOUND_PREFIX 환경변수가 정확히 동일해야 합니다(슬래시 포함).

로그 과다 비용 방지

→ LOG_BODY_LIMIT로 본문 로그 길이 제한. 민감정보 마스킹 권장.


댓글