Kafka

카프카 스트림즈(Kafka Streams)

ZzangHo 2022. 12. 14. 00:14
728x90

기존에 카프카를 이용한 데이터 파이프라인 작업으로는 Producer -> Kafka -> Consumer 형태의 프로젝트들을 많이 하였는데 이번에 신규 프로젝트를 진행하게 되면서 카프카 스트림즈(Kafka Streams)를 이용한 경험을 정리 해본다.

 

카프카 스트림즈란?

토픽에 적재된 데이터를 실시간으로 변환하여 다른 토픽에 적재하는 라이브러리이다. 

카프카의 스트림 데이터를 처리하기 위해 아파치 스파크, 아파치 플링크 등 다양한 오픈소스 어플리케이션들이 존재하지만 카프카 스트림즈를 사용해야 하는 이유는 카프카에서 공식적으로 지원하는 라이브러이기 때문이다. 카프카 클러스터와 완벽하게 호환이 되면서 편리한 기능(신규 토픽 생성, 상태 저장, 데이터 조인 등)들을 제공해준다.

 

카프카 스트림즈의 구조와 사용 방법을 알기 위해서는 우선 토폴리지(topology)와 관련된 개념을 알아야 한다.

토폴리지란 2개 이상의 노드들과 선으로 이루어진 집합을 뜻하는데 종류로는 링형, 트리형, 성형 등이 있는데 스트림즈에서 사용 되는 토폴로지는 트리 형태와 유사하다.

 

토폴로지 형태들(http://www.terms.co.kr/topology.htm)

 

카프카 스트림즈에서는 토폴리지를 이루는 노드를 하나의 '프로세서(processor)'라고 부르고 노드와 노드를 이은 선을 '스트림(stream)'이라고 부른다.

 

데이터 조인

위의 편리한 기능 중 데이터 조인 부분에서 카프카 스트림즈를 활용해봐야 겠다는 생각이 들었다. 

먼저, 카프카 스트림즈의 조인 방법으로는 크게 아래와 같이 존재한다.

 

기본 유형      보조 유형  내부 조인 왼쪽 조인 외부 조인
KStream KStream 지원 지원 지원
KTable KTable 지원 지원 지원
KStream KTable 지원 지원 해당 없음
KStream Global KTable 지원 지원 해당 없음

 

KStream이란 레코드의 흐름을 표현한 것으로 키와 메세지값으로 구성 되어있다. KStream은 컨슈머로 토픽의 데이터를 읽는 것과 같다고 볼 수 있다. 

KTable이란 주어진 메세지 키에 대한 최신 값을 보유한다. 즉, 동일한 메세지 키가 들어오면 제일 최신값으로 대체 된다.

Global KTable이란 KTable과 동일하게 메세지 키를 기준으로 묶어서 사용. KStream-KTable 조인 시 코파티셔닝이 되어 있지 않은 경우 Global KTable을 이용하면 해결 가능하다.

 

 

신규 프로젝트에서 데이터는 메타 데이터와 통계 데이터를 전달 받고 있는데 실제 검색엔진에 색인 될 때는 flat하게 데이터가 펼쳐져 있어야 했다. 

메타 데이터와 통계 데이터는 1:1 관계였고 전달받은 데이터를 RDB에 저장하였다면 아주 쉬웠겠지만 우리팀의 경우 데이터 레이크로 MongoDB를 사용하고 있었다. MongoDB에서도 효율적인 Join 방법(Reference, Nested)들이 있었지만 카프카 스트림즈의 Join 기능을 찾아보게 되었다.

 

여기서 내가 선택한 Join 방법은 KTable-KTable 이였다.

KTable-KTable을 선택한 이유는 아래와 같은 제약 사항들이 있었다.

  • 메타 데이터는 실시간, 통계 데이터는 전체 데이터가 데일리 단위로 한번씩 들어온다.
  • 메타 데이터는 있지만 통계 데이터가 없는 경우 데이터는 검색 되어야 한다.
  • 통계 데이터는 있지만 메타 데이터가 없는 경우 데이터는 검색 되지 않아야 한다.
  • 메타 데이터 + 통계 데이터 조인이 된 상태에서 전체 통계 데이터가 매일 새로 들어오기 때문에 계속 최종 데이터를 갱신해주어야 한다.

 

맨 처음 KStream-KTable 로 진행을 해볼까 했었지만 위의 제약 사항 중 4번 통계 데이터가 매일 들어올때마다 최종 데이터를 갱신해주어야 하는 문제에서 KStream-KTable의 경우 KTable의 데이터가 변경이 되어도 KStream이 Stateless 형태이므로 흘러간 데이터를 조인하지 못해 최종 데이터가 갱신이 되지 않는 구조여서 제외 되었다.

 

최종적으로는 메타 데이터도 Stateful해야 했고 통계 데이터도 Stateful 해야 했다. 그렇기 때문에 KTable-KTable 형태의 조인을 선택하였다. 해당 조인은 Non-windowed 조인이다.

 

https://www.confluent.io/blog/crossing-streams-joins-apache-kafka/

 

Join co-partitioning

두 개의 토픽을 조인 하는 경우 코파티셔닝은 필수이다. 이는 같은 키를 가진 입력 레코드는 같은 태스크로 전달되는 것을 보장해주기 때문이다.

 

코파티셔닝이란 조인을 하는 2개 데이터의 파티션 개수가 동일하고 파티셔닝 전략을 동일하게 맞추는 작업

 

만약, 코파티셔닝이 되어 있지 않는 경우에는 Global KTable을 사용하자

 

 

향후 계획

추후에는 Click, View Log를 기반으로 KStream-KStream 형태의 Time Window 두어 인터파크 내의 Good Click, Bad Click 을 지표로 뽑는 작업을 해보면 좋을 것 같다.

 

참조

'Kafka' 카테고리의 다른 글

Kafka Streams 어플리케이션 초기화  (0) 2023.01.10
Kafka Consumer 멀티 쓰레드로 간단하게?  (0) 2022.01.28
Consumer Lag - Grafana 셋팅  (0) 2022.01.27
Consumer Lag - telegraf 설정  (0) 2022.01.27
Consumer Lag - burrow 설치  (0) 2022.01.27