[Kafka + Spring Boot Tip] Duplicate Topic Receive & Already rebalanced and assigned Error Solution

jelly
Oct 22, 2020

--

Kafka + Spring Boot 를 이용하다 보면 아래와 같은 2가지가 문제가 간혹 발생하곤 한다.

  1. 동일한 Topic 을 여러번 받는 문제
  2. Topic 을 받아 코드가 실행되고 있는데, 다시 Topic 을 Polling 하여 발생하는문제 (org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another membe )

동일 Topic 을 여러번 받는 문제와, Topic 을 받아서 작업하고 있는데 Polling 을 다시 하는 문제는 대부분 Topic 을 받고 작업하는 시간이 오래 걸려서 나타나는경우가 많다.

이 문제는 아래와 같은 방법으로 해결이 가능하다.

1. auto commit 을 False 로 셋팅하고 Kafka Listener 에서 Ack 를 수행

아래 코드와 같이 kafka config, kafka listener 를 수정하면 된다.

#KafkaConsumerConfig.java

@ConditionalOnMissingBean                         
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();

...

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
...}@Bean(name="kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory()
{
...

factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
...}

#KafkaConsumer.java

@KafkaListener(topics = "jjeaby-topic", containerFactory = "kafkaListenerContainerFactory")
public void receiveFromKafka(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey,
Acknowledgment acknowledgment) throws Exception
{
... acknowledgment.acknowledge();
}

2. polling interval time, session timeout 을 수행하는 작업에 맞추어서 설정

아래 코드와 같이 kafka config 를 수정하면 된다. 여기서 중요한 부분은 MAX_POLL_INTERVAL_MS_CONFIG 을 작업에 걸리는 시간에 맞추어서 수정해야 한다는 것이다.(600000은 생각보다 큰 값임을 기억하자)

#KafkaConsumerConfig.java

@ConditionalOnMissingBean                         
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();

...
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,20);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
...}

1,2 번의 설정을 적용한 전체 코드는 아래와 같다. kafka + spring boot 를 사용하는데 도움이 되면 좋겠돠~~~:)

--

--