Re: 0.9.0.1 Kafka assign partition to new Consumer error

2016-03-09 Thread Ken Cheng
Hi Jason, In my test case, I set enable.auto.commit=false and then I need to use commitSync() in the onPartitionsAssigned() to avoid consumer rebalanced. Actually, I place it in for(TopicPartition partition: partitions), so that commitSync() will not execute when partitions are first assigned.

Re: 0.9.0.1 Kafka assign partition to new Consumer error

2016-03-08 Thread Jason Gustafson
Hey Ken, Whether to use subscribe or assign depends mainly on whether you need to use consumer groups to distribute the topic load. If you use subscribe(), then the partitions for the subscribed topics will be divided among all consumers sharing the same groupId. With assign(), you have to

Re: 0.9.0.1 Kafka assign partition to new Consumer error

2016-03-08 Thread Ken Cheng
Hi Jason, Thanks for your detailed explain. Face to this situation, I wanna discuss more a little bit, I try two approach to avoid it in Kafka 0.9.0.1, and both work correctly. 1. Using subscribe(topics, listener) and implements onPartitionsAssigned(partitions) , it manually run

Re: 0.9.0.1 Kafka assign partition to new Consumer error

2016-03-07 Thread Jason Gustafson
Hi Ken, It looks like what happened is this: 1. First thread joins the group and is assigned partitions 0 and 1 2. First thread races through a bunch of messages from these partitions. 3. Second thread joins and is assigned partition 1 (leaving partition 0 for the first thread) 4. Both threads