import os
import json
import logging
import boto3
import pymysql
from email import policy
from email.parser import BytesParser
logger = logging.getLogger()
logger.setLevel(logging.INFO)
S3 = boto3.client('s3')
DB_HOST = os.environ['DB_HOST']
DB_PORT = int(os.environ.get('DB_PORT', '3306'))
DB_USER = os.environ['DB_USER']
DB_PASS = os.environ['DB_PASS']
DB_NAME = os.environ['DB_NAME']
MAX_LOG_CHARS = int(os.getenv("LOG_BODY_LIMIT", "2000"))
def _truncate(text: str | None, limit: int) -> str:
if not text:
return ""
return text if len(text) <= limit else (text[:limit] + "…(truncated)")
def _extract_bodies_and_attachments(msg):
"""text/plain, text/html 본문과 첨부파일 정보 추출"""
text_body, html_body = None, None
attachments = []
if msg.is_multipart():
for part in msg.walk():
disp = (part.get_content_disposition() or "").lower()
ctype = part.get_content_type().lower()
if disp == "attachment":
payload = part.get_payload(decode=True) or b''
attachments.append({
"filename": part.get_filename(),
"contentType": ctype,
"size": len(payload)
})
elif ctype == "text/plain" and text_body is None:
text_body = part.get_content()
elif ctype == "text/html" and html_body is None:
html_body = part.get_content()
else:
ctype = msg.get_content_type().lower()
body = msg.get_content()
if ctype == "text/html":
html_body = body
else:
text_body = body
return text_body, html_body, attachments
def insert_email_to_db(email_data, s3_bucket, s3_key):
"""이메일 데이터를 MariaDB에 삽입"""
try:
conn = pymysql.connect(
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
database=DB_NAME,
charset='utf8mb4'
)
with conn.cursor() as cur:
sql = """
INSERT INTO email_inbound
(message_id, mail_from, rcpt_to, subject, date_header,
text_body, html_body, attachments, s3_bucket, s3_object_key)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)
"""
cur.execute(sql, (
email_data['message_id'],
email_data['mail_from'],
email_data['rcpt_to'],
email_data['subject'],
email_data['date_header'],
email_data['text_body'],
email_data['html_body'],
json.dumps(email_data['attachments'], ensure_ascii=False),
s3_bucket,
s3_key
))
conn.commit()
logger.info("DB INSERT OK | message_id=%s", email_data['message_id'])
except Exception as e:
logger.error("DB INSERT FAILED | error=%s", str(e))
raise
finally:
if 'conn' in locals():
conn.close()
def lambda_handler(event, context):
if not ('Records' in event and 'ses' in event['Records'][0]):
logger.error("Not SES->Lambda event")
return {"ok": False, "error": "not_ses_lambda_event"}
ses_rec = event['Records'][0]['ses']
bucket = os.getenv("INBOUND_BUCKET")
prefix = os.getenv("INBOUND_PREFIX", "")
if prefix and not prefix.endswith('/'):
prefix += '/'
if not bucket:
logger.error("INBOUND_BUCKET not set. Set Lambda env var INBOUND_BUCKET.")
return {"ok": False, "error": "INBOUND_BUCKET not set"}
message_id = ses_rec['mail']['messageId']
key = f"{prefix}{message_id}"
logger.info("S3 DEBUG | bucket=%s | key=%s | uri=s3://%s/%s", bucket, key, bucket, key)
obj = S3.get_object(Bucket=bucket, Key=key)
raw = obj['Body'].read()
logger.info("S3 GET OK | size_bytes=%s | eTag=%s", obj.get('ContentLength'), obj.get('ETag'))
msg = BytesParser(policy=policy.default).parsebytes(raw)
subject = msg.get('subject', '')
mail_from = msg.get('from', '')
rcpt_to = msg.get_all('to', []) or []
cc = msg.get_all('cc', []) or []
date_hdr = msg.get('date', '')
text_body, html_body, attachments = _extract_bodies_and_attachments(msg)
logger.info("EMAIL META | subject=%r | from=%s | to=%s | cc=%s | date=%s",
subject, mail_from, ", ".join(rcpt_to), ", ".join(cc), date_hdr)
logger.info("EMAIL TEXT (<=%d chars): %s", MAX_LOG_CHARS, _truncate(text_body, MAX_LOG_CHARS))
logger.info("EMAIL HTML (<=%d chars): %s", MAX_LOG_CHARS, _truncate(html_body, MAX_LOG_CHARS))
logger.info("ATTACHMENTS COUNT: %d", len(attachments))
email_data = {
"subject": subject,
"message_id": message_id,
"mail_from": mail_from,
"rcpt_to": ", ".join(rcpt_to),
"date_header": date_hdr,
"text_body": text_body,
"html_body": html_body,
"attachments": attachments
}
try:
insert_email_to_db(email_data, bucket, key)
except Exception as e:
logger.error("Failed to insert email to database: %s", str(e))
return {
"ok": False,
"error": "database_insert_failed",
"bucket": bucket,
"key": key,
"messageId": message_id
}
return {
"ok": True,
"bucket": bucket,
"key": key,
"messageId": message_id,
"database_inserted": True
}
→ Lambda 역할에 S3 읽기 권한이 없는 경우(위 IAM JSON 적용).
→ KMS 사용 시 kms:Decrypt도 추가.
→ 배포 ZIP/Lambda Layer에 PyMySQL 포함. (런타임 버전과 동일한 파이썬으로 빌드)
→ Receipt Rule의 S3 Prefix 와 INBOUND_PREFIX 환경변수가 정확히 동일해야 합니다(슬래시 포함).
→ LOG_BODY_LIMIT로 본문 로그 길이 제한. 민감정보 마스킹 권장.
댓글