Dear Experts, We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of topic with 30 partitions and 2 replicas. We are using High level consumer api. Each consumer process which is a storm topolofy has 5 streams which connects to 1 or more partitions. We are not using storm's inbuilt kafka spout. Everything runs fine till the 5th consumer process(25 streams) is added for this topic.
As soon as the sixth consumer process is added, the newly added partition does not get the ownership of the partitions that it requests for as the already existing owners have not yet given up the ownership. We changed certain properties on consumer : 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms * rebalance.max.retries >> zk connection timeout) 2. Back off ms between rebalances - 10000 (10seconds) 3. ZK connection timeout - 100,000 (100 seconds) Although when I am looking in the zookeeper shell when the rebalance is happening, the consumer is registered fine on the zookeeper. Just that the rebalance does not happen. After the 20th rebalance gets completed, we get *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all offsets after clearing the fetcher queues* *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring exception while trying to start streamer threads: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *kafka.common.ConsumerRebalanceFailedException: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* * at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) ~[stormjar.jar:na]* * at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) ~[stormjar.jar:na]* * at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) ~[stormjar.jar:na]* * at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80) ~[stormjar.jar:na]* * at com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79) ~[stormjar.jar:na]* * at com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64) ~[stormjar.jar:na]* * at com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71) [stormjar.jar:na]* * at com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48) [stormjar.jar:na]* * at com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63) [stormjar.jar:na]* * at com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121) [stormjar.jar:na]* * at backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562) [na:0.9.1-incubating]* * at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) [na:0.9.1-incubating]* * at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]* * at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]* *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin registering consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK* *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b data: {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"} stored data: {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}* *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}] at /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry* Due to this error, none of the consumer consumes from these partitions in contention which creates a sort of skewed lag on kafka side. When the 6th consumer was added, the existing owner process of the partitions in question did not get rebalanced. Any help would be highly appreciated. -Thanks, Mohit