Amazon MSK와 DynamoDB에서 생성되는 스트리밍 데이터 수집 및 처리 플랫폼 구축. 실시간 이벤트 스트림 처리와 반정형 데이터 처리
기존 RDB 중심의 데이터 수집 방식으로는 클라이언트 로그와 같이 비정형적이고 실시간성이 강한 데이터를 처리하는 데 한계가 있었습니다. 또한, DynamoDB와 같은 반정형 데이터베이스의 이벤트 로그를 실시간으로 수집하고 분석하는 데 필요한 인프라가 부족했습니다.
본 프로젝트는 Amazon MSK를 중앙 데이터 허브로 구축하여 모든 이벤트 로그 데이터의 수집을 표준화하고, 이를 데이터 웨어하우스(DW) 및 다양한 타겟 시스템에 안정적으로 연동하는 대용량 스트리밍 데이터 수집 플랫폼을 구축하는 것을 목표로 했습니다.
-- 외부 스키마 생성 (MSK 브로커 연동)
CREATE EXTERNAL SCHEMA kafka_schema
FROM KAFKA
AUTHENTICATION NONE
URI 'b-1.msk-prod-cluster.xxxxxx.c4.kafka.ap-northeast-2.amazonaws.com:9094,b-2.msk-prod-cluster.xxxxxx.c4.kafka.ap-northeast-2.amazonaws.com:9094'
-- Materialized View 생성 (특정 토픽 구독)
CREATE MATERIALIZED VIEW my_schema.my_event_view AUTO REFRESH YES AS
SELECT *
FROM kafka_schema."dev.service.myevent";
-- SUPER 타입의 JSON 데이터 파싱 조회
SELECT
parsed_json.app.id::INT AS app_id,
parsed_json.app.version AS app_version,
parsed_json.device.id AS device_id
FROM (
SELECT json_parse(kafka_value) AS parsed_json
FROM my_schema.my_event_view
);
# Snowflake Sink Connector Properties
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=2
topics=dev.pp.neoevent,dev.ppserver.neoevent
snowflake.topic2table.map=dev.pp.neoevent:dev_pp_neoevent,dev.ppserver.neoevent:dev_ppserver_neoevent
# Snowflake Connection Info
snowflake.url.name=xxxxxxxx-xxxxxxxx.snowflakecomputing.com:443
snowflake.user.name=msk_user
snowflake.private.key=MIIFNTBf... (Secret)
snowflake.private.key.passphrase=************
snowflake.database.name=msk_db
snowflake.schema.name=msk_schema
# Buffer & Converter Settings
buffer.count.records=10000
buffer.flush.time=60
buffer.size.bytes=5000000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
batch_size
와 SQS의 maximum_batching_window_in_seconds
를 최적화하여 대규모 이벤트를 효율적으로 그룹화하고 처리할 수 있도록 구성했습니다. 이를 통해 피크 타임에도 안정적인 데이터 처리를 보장했습니다.INSERT
, MODIFY
, REMOVE
이벤트 형식으로 발생하는 JSON 데이터를 분석가가 활용할 수 있도록 가공하고 안정적인 테이블 구조를 가질 필요가 있었습니다.-- 예시 records
{
"Records":[
{
"eventID":"1",
"eventName":"INSERT",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"111",
"SizeBytes":26,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"2",
"eventName":"MODIFY",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"NewImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"New item!"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"222",
"SizeBytes":59,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
},
{
"eventID":"3",
"eventName":"REMOVE",
"eventVersion":"1.0",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"Keys":{
"Id":{
"N":"101"
}
},
"OldImage":{
"Message":{
"S":"This item has changed"
},
"Id":{
"N":"101"
}
},
"SequenceNumber":"333",
"SizeBytes":38,
"StreamViewType":"NEW_AND_OLD_IMAGES"
},
"eventSourceARN":"stream-ARN"
}
]
}
raw 테이블
과 state 테이블
로 나누어 테이블을 설계하고 데이터의 최종 상태를 정확하게 반영하기 위해 멱등성을 보장하는 UPSERT 쿼리를 설계했습니다. 임시 테이블(Stage)을 활용하여 특정 시간 범위 내의 이벤트 중 가장 최신 이벤트만 선별하고, updatedAt
과 sequence_number
를 복합적으로 사용하여 순서가 보장된 데이터 변경을 수행했습니다.-- DynamoDB Streams CDC 데이터를 Redshift State 테이블에 MERGE하는 쿼리
BEGIN;
-- 1단계: 임시 테이블에 최신 이벤트만 선별하여 저장
CREATE TEMP TABLE stage_events AS
SELECT pk, sk, event_name, approximate_creation_date_time, sequence_number, new_image
FROM (
SELECT *, ROW_NUMBER() OVER (
PARTITION BY pk, sk
ORDER BY
CASE WHEN event_name = 'REMOVE' THEN 1 ELSE 0 END DESC,
COALESCE(CAST(new_image.updatedAt.N AS BIGINT), 0) DESC,
approximate_creation_date_time DESC,
COALESCE(CAST(sequence_number AS DECIMAL(38,0)), 0) DESC
) AS rn
FROM my_schema.my_raw_table
WHERE creation_time BETWEEN '2025-08-17 14:40:41.135' AND '2025-08-17 14:44:54.010'
) t WHERE rn = 1;
-- 2단계: 'MODIFY' 이벤트 업데이트
UPDATE my_schema.my_state_table SET ... FROM stage_events WHERE ...;
-- 3단계: 'INSERT' 또는 새로운 'MODIFY' 이벤트 삽입
INSERT INTO my_schema.my_state_table (...) SELECT ... FROM stage_events WHERE ...;
-- 4단계: 'REMOVE' 이벤트 삭제
DELETE FROM my_schema.my_state_table USING stage_events WHERE ...;
DROP TABLE stage_events;
END;
S3 Bucket Policy
를 통해 접근 제어를 유지하며 효율적으로 데이터를 공유할 수 있었습니다.# IAM 역할 및 정책 생성 (Lambda가 DynamoDB Stream을 읽고 S3에 쓰도록 허용)
resource "aws_iam_role" "dynamodb_stream_role" { ... }
resource "aws_iam_policy" "dynamodb_stream_policy" { ... }
resource "aws_iam_role_policy_attachment" "dynamodb_stream_attachment" { ... }
# Lambda 함수 및 이벤트 소스 매핑
resource "aws_lambda_function" "dynamodb_stream_consumer" {
function_name = "dynamodb-stream-to-s3"
role = aws_iam_role.dynamodb_stream_role.arn
...
}
resource "aws_lambda_event_source_mapping" "dynamodb_stream_mapping" {
event_source_arn = "arn:aws:dynamodb:ap-northeast-2:123456789012:table/MyTable/stream/..."
function_name = aws_lambda_function.dynamodb_stream_consumer.arn
starting_position = "TRIM_HORIZON"
batch_size = 100000
maximum_batching_window_in_seconds = 5
}
INSERT
, UPDATE
, DELETE
이벤트의 순서가 보장되지 않거나 중복 발생 가능성이 항상 존재함을 인지했습니다. UPSERT
쿼리 설계 시 단순히 최신 데이터만 반영하는 것을 넘어, sequence_number
와 같은 보조 정렬 키를 활용하여 데이터의 최종 상태를 보장하는 견고한 멱등성 로직을 구현하는 것이 얼마나 중요한지 깊이 깨달았습니다.batch_size
나 SQS의 maximum_batching_window_in_seconds
같은 파라미터 하나가 전체 파이프라인의 안정성에 얼마나 큰 영향을 미치는지 체감했습니다. 스트레스 테스트를 통해 병목 지점을 파악하고, 시스템의 특성에 맞게 배치를 최적화하는 과정이 안정적인 스트리밍 플랫폼 구축의 핵심임을 배웠습니다.