I am currently attempting to upgrade my software to use Kafka 0.9 from 0.8.2. I 
am trying to switch over to the new Consumer API to allow for rebalancing as 
machines are added or removed from our cluster. I am running into an issue 
where the same partition on a topic is being assigned to multiple consumers for 
a short period of time when a machine is added to the group. This results in 
some of the messages being processed more than once, while I am aiming for 
exactly once. I followed the setup instructions in the Javadocs and use an 
external data store for saving the offsets while consuming and when rebalancing.

In my test cluster I start initially with 2 machines consuming and a single 
producer. Everything works fine at the start and each consumer gets half of the 
partitions. When I add a third machine it is assigned a portion of the 
partitions but these partitions aren't revoked from one of the two initial 
machines. Below are some log statements from my program, hopefully they help 
illustrate my situation.

Partition 14 is initially assigned to machine 1. Machine 1 reads a number of 
messages before machine 3 is added. Partition 14 is assigned to machine 3 when 
started, but partition 14 was not revoked from machine 1. Both machines then 
read the same message at offset 3 before the system rebalances and both have 
access to partition 14 revoked. Machine 2 is then assigned partition 14 after 
it is revoked from machine 1 but is still assigned to machine 3. After it is 
revoked from machine 3, machine 2 is the only one with access to partition 14.

Machine 1 (Turned on at start)
2016-08-24 14:17:08 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 
14 for topic assignments to worker with offset 0
2016-08-24 14:18:48 DEBUG KafkaStreamReader:312 - Committing topic assignments 
partition 14 offset 3
2016-08-24 14:19:38 DEBUG KafkaStreamReader:200 - partition = 14, offset = 3 
(Message read from kafka)
2016-08-24 14:19:38 DEBUG KafkaStreamReader:312 - Committing topic assignments 
partition 14 offset 4
2016-08-24 14:19:39 DEBUG KafkaStreamReader:338 - REVOKED: Committing for 
partition 14 of topic assignments offset 4

Machine 2 (Turned on at start)
2016-08-24 14:19:51 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 
14 for topic assignments to worker with offset 4

Machine 3 (Turned on a few minutes later)
2016-08-24 14:19:21 DEBUG KafkaStreamReader:351 - ASSIGNED: Assigning partition 
14 for topic assignments to worker with offset 3
2016-08-24 14:19:48 DEBUG KafkaStreamReader:200 - partition = 14, offset = 3 
(Message read from kafka - already read by machine 1)
2016-08-24 14:20:00 DEBUG KafkaStreamReader:338 - REVOKED: Committing for 
partition 14 of topic assignments offset 4

My cluster is running Cloudera 5.7.0 with Kafka version 2.0.1-1.2.0.1.p0.5 
which corresponds to Kafka version 0.9.0.0+kafka2.0.1+283. 
(https://www.cloudera.com/documentation/kafka/latest/topics/kafka_packaging.html)

Can anyone help explain what I'm doing wrong here? If there is any further 
information I can provide to help this along please let me know and I will be 
happy to provide it if I can.

Reply via email to