[Kafka + Spring Boot Tip] Duplicate Topic Receive & Already rebalanced and assigned Error Solution
Kafka + Spring Boot 를 이용하다 보면 아래와 같은 2가지가 문제가 간혹 발생하곤 한다.
- 동일한 Topic 을 여러번 받는 문제
- Topic 을 받아 코드가 실행되고 있는데, 다시 Topic 을 Polling 하여 발생하는문제 (
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another membe
)
동일 Topic 을 여러번 받는 문제와, Topic 을 받아서 작업하고 있는데 Polling 을 다시 하는 문제는 대부분 Topic 을 받고 작업하는 시간이 오래 걸려서 나타나는경우가 많다.
이 문제는 아래와 같은 방법으로 해결이 가능하다.
1. auto commit 을 False 로 셋팅하고 Kafka Listener 에서 Ack 를 수행
아래 코드와 같이 kafka config, kafka listener 를 수정하면 된다.
#KafkaConsumerConfig.java
@ConditionalOnMissingBean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
...
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); ...}@Bean(name="kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory()
{ ...
factory.getContainerProperties()
.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); ...}
#KafkaConsumer.java
@KafkaListener(topics = "jjeaby-topic", containerFactory = "kafkaListenerContainerFactory")
public void receiveFromKafka(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String messageKey,
Acknowledgment acknowledgment) throws Exception
{ ... acknowledgment.acknowledge();
}
2. polling interval time, session timeout 을 수행하는 작업에 맞추어서 설정
아래 코드와 같이 kafka config 를 수정하면 된다. 여기서 중요한 부분은 MAX_POLL_INTERVAL_MS_CONFIG 을 작업에 걸리는 시간에 맞추어서 수정해야 한다는 것이다.(600000은 생각보다 큰 값임을 기억하자)
#KafkaConsumerConfig.java
@ConditionalOnMissingBean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
... props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,20);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 600000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000); ...}
1,2 번의 설정을 적용한 전체 코드는 아래와 같다. kafka + spring boot 를 사용하는데 도움이 되면 좋겠돠~~~:)