Dear Experts,

We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
topic with 30 partitions and 2 replicas. We are using High level consumer
api.
Each consumer process which is a storm topolofy has 5 streams which
connects to 1 or more partitions. We are not using storm's inbuilt kafka
spout. Everything runs fine till the 5th consumer process(25 streams) is
added for this topic.

As soon as the sixth consumer process is added, the newly added partition
does not get the ownership of the partitions that it requests for as the
already existing owners have not yet given up the ownership.

We changed certain properties on consumer :

1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
rebalance.max.retries >> zk connection timeout)
2. Back off ms between rebalances - 10000 (10seconds)
3. ZK connection timeout - 100,000 (100 seconds)

Although when I am looking in the zookeeper shell when the rebalance is
happening, the consumer is registered fine on the zookeeper. Just that the
rebalance does not happen.
After the 20th rebalance gets completed, we get


*2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
[rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
offsets after clearing the fetcher queues*
*2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
exception while trying to start streamer threads:
rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after
20 retries*
*kafka.common.ConsumerRebalanceFailedException:
rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after
20 retries*
*        at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
~[stormjar.jar:na]*
*        at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
~[stormjar.jar:na]*
*        at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
~[stormjar.jar:na]*
*        at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
~[stormjar.jar:na]*
*        at
com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
~[stormjar.jar:na]*
*        at
com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
~[stormjar.jar:na]*
*        at
com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
[stormjar.jar:na]*
*        at
com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
[stormjar.jar:na]*
*        at
com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
[stormjar.jar:na]*
*        at
com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
[stormjar.jar:na]*
*        at
backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
[na:0.9.1-incubating]*
*        at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
[na:0.9.1-incubating]*
*        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]*
*        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]*
*2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
[rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin registering
consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK*
*2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in
/consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
data:
{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}
stored data:
{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}*
*2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
node
[{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}]
at
/consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
a while back in a different session, hence I will backoff for this node to
be deleted by Zookeeper and retry*

Due to this error, none of the consumer consumes from these partitions in
contention which creates a sort of skewed lag on kafka side.  When the 6th
consumer was added, the existing owner process of the partitions in
question did not get rebalanced.

Any help would be highly appreciated.

-Thanks,
Mohit

Reply via email to