Kafka

Spring Kafka로 토픽 구독

ZzangHo 2022. 1. 26. 23:06
728x90

Spring Kafka란?

카프카를 스프링 프레임워크에서 효과적으로 사용할 수 있도록 만들어진 라이브러리다.

기존 카프카 클라이언트 라이브러리를 래핑하여 만든 스프링 카프카 라이브러리는 카프카 클라이언트에서 사용하는 여러가지 패턴을 미리 제공한다.

 

dependency 추가

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

 

@KafkaListener 어노테이션 사용(Record 단위로 읽기 default모드)

[ConsumerListener.java]

@Component
@Slf4j
public class ConsumerListener {
     
    @KafkaListener(topics = "topicName", groupId = "topicGroupName")
    public void receiveMessage(@Payload String message) {
        log.info("message :: " + message);
    }
 
}

topics : 어느 토픽을 컨슈밍할 것인지 지정

groupid : 토픽을 컨슈밍할때 scale up을 위해 topicGroup을 지정해 준다

 

테스트

테스트 결과

이렇게 간단하게 카프카의 토픽을 구독하여 메세지를 전달받을 수 있다. 그리고 빨간색 네모 안에 보면 기본적으로 1개 Thread로 동작하고 있는 것을 확인 할 수 있다.

 

Spring Kafka의 경우 기본 Record 단위로 읽는것이 default 설정이므로 따로 application.propertes나 application.yaml 등에서 설정을 해주지 않았으면 Record 단위로 생각하면 된다.

 

Batch형태로 메세지를 읽고 싶다면 아래 옵션을 위의 파일에 추가를 해주고 파라미터 수정을 해주면 된다.

 

@KafkaListener 어노테이션 사용(Batch 단위로 읽기)

[application.yml]

spring:
  profiles: local
  kafka:
    bootstrap-servers: {kafka서버정보}
    listener:
      type: batch

 

위와 같이 spring.kafka.listener.type : batch 를 선언해 준다.

 

[ConsumerListener.java]

@Component
@Slf4j
public class ConsumerListener {
     
    @KafkaListener(topics = "topicName", groupId = "topicGroupName")
    public void receiveMessage(ConsumerRecords<String, String> records) {
        records.forEach(record -> log.info(record.toString())); 
    }
 
}

'Kafka' 카테고리의 다른 글

Consumer Lag - burrow 설치  (0) 2022.01.27
Kafka Install 버전 설치  (0) 2022.01.27
카프카 토픽명 규칙  (0) 2022.01.26
토픽과 파티션, 레코드  (0) 2022.01.26
Kafka에서 데이터 순서 보장하려면?  (0) 2022.01.26