카프카 프로듀서 애플리케이션

프로듀서의 역할

  • 토픽에 해당하는 메시지를 생성
    • 토픽에 전송할 메시지 생성
  • 특정 토픽으로 데이터를 publish
    • 특정 토픽으로 데이터를 전송
  • 처리 실패/재시도
    • 카프카 브로커로 데이터를 전송할 때, 전송 성공여부를 알 수 있고, 실패할 경우 재시도 할 수도 있음

JAVA로 작성하는 프로듀서

카프카 클라이언트인 컨슈머와 프로듀서를 사용하기 위해서는 아파치 카프카 라이브러리를 추가해야 함

라이브러리는 gradle이나 maven 같은 라이브러리 관리 도구를 사용하여 편리하게 가져올 수 있음

Untitled

❗ 카프카 클라이언트를 디펜던시로 잡을 때는 버전을 주의해야 함

카프카는 브로커 버전과 클라이언트 버전의 하위호환성이 모든 버전에 대해서 지원하지 않기 때문에 브로커와 클라이언트 버전의 하위호환성을 확인하고 알맞은 카프카 클라이언트 버전을 사용해야 함

예를들어, 특정 버전 카프카 브로커는 특정 버전 카프카 클라이언트를 지원하지 않을 수 있음

카프카 프로듀서를 작성한 코드

Untitled

01. 프로듀서 설정

// 자바 프로퍼티 객체를 통해 프로듀서의 설정을 정의
Properties configs = new Properties();
//부트스트랩 서버 옵션을 통해 카프카 브로커를 설정
//부트스트랩 서버 설정을 로컬 호스트의 카프카를 바라보도록 설정함
configs.put("bootstrap.servers", "localhost:9092");

카프카 브로커의 주소목록은 되도록이면 2개 이상의 ip와 port를 설정하도록 권장하는데, 둘 중 한 개 브로커가 비정상일 경우 다른 정상적인 브로커에 연결되어 사용 가능하기 때문

그러므로 실제로 애플리케이션을 카프카와 연동할 때는 반드시 2개 이상의 브로커 정보를 넣는 것을 추천

// key와 value에 대해 StringSerializer로 직렬화를 설정
configs.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Serializer는 key 혹은 value를 직렬화하기 위해 사용되는데, Byte array, String, Integer serializer를 사용할 수 있음

여기서 key는 메시지를 보내면, 토픽의 파티션이 지정될 때 쓰임

02. 프로듀서 인스턴스 생성

// 설정한 프로퍼티로 카프카 프로듀서 인스턴스를 만듦
KafkaProducer < String, String > producer = new KafkaProducer < String, String > (configs);

03. 전송할 객체 생성

// 전송할 객체 생성 - 카프카 클라이언트에서 ProducerRecord 클래스 제공
ProducerRecord record = new ProducerRecord < String, String > ("click_log", "login");

ProducerRecord 인스턴스를 생성할 때, 어느 토픽에 넣을 것인지 어떤 key와 value를 담을 것인지 선언할 수 있음

위 코드는 key 없이 click_log 토픽에 login이라는 value를 보냄

ProducerRecord record = new ProducerRecord < String, String > ("click_log", "1", "loging");

만약, key를 포함하여 보내고 싶다면 위의 코드와 같이 ProducerRecord를 선언하면 됨

파라미터 개수에 따라 자동으로 오버로딩되어 인스턴스가 생성되므로 이 점을 유의해서 ProducerRecord를 생성

여기까지 데이터가 도착할 토픽, 데이터, 카프카 브로커의 호스트와 포트까지 데이터를 전송할 모든 준비가 됨

04. 전송

producer.send(record);

이전에 생성된 프로듀서 인스턴스에 send() 메서드의 파라미터로 ProducerRecord를 넣으면 전송이 이루어지게 됨

click_log 토픽에 login value가 들어가게 됨

05. 프로듀서 종료

producer.close();

전송이 완료되면 close() 메서드를 통해 프로듀서를 종료

그림으로 보는 카프카 프로듀서의 메시지 전송

01. 1 partition & null key

Untitled

  • key가 null인 데이터를 파티션이 1개인 토픽에 보내면 다음과 같이 차례대로 쌓이게 됨

02. 2 partitions & null key

Untitled

  • 만약 파티션이 1개가 더 늘어나면 key가 null이므로 데이터가 round-robin으로 2개의 파티션에 차곡차곡 쌓이게 됨

03. 2 partitions & 2 keys

Untitled

  • 반면 buy라는 value의 key를 1, review라는 value의 key를 2라고 지정
  • 카프카는 key를 특정한 hash값으로 변경시켜 파티션과 1대1 매칭을 시킴
  • 그러므로 위 코드를 반복해서 실행시키면, 각 파티션에 동일 key의 value만 쌓이게 됨

04. 3+ partitions & 2 keys

Untitled

  • 이 때 파티션을 한 개 더 추가한다면 key와 파티션의 매칭이 깨지기 때문에 key와 파티션의 연결은 보장되지 않음
  • 그러므로 key를 사용할 경우 이 점을 유의해서 파티션 개수를 생성하고 추후에 생성하지 않는 것을 추천

참고 강의 : https://www.inflearn.com/course/아파치-카프카-입문/dashboard