Mohit,

I wonder if it is related to
https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires a
session, it doesn't delete the ephemeral nodes immediately. So if you end
up trying to recreate ephemeral nodes quickly, it could either be in the
valid latest session or from the previously expired session. If you hit
this problem, then waiting would resolve it. But if not, then this may be a
legitimate bug in ZK 3.4.6.

Can you try shutting down all your consumers, waiting until session timeout
and restarting them?

Thanks,
Neha

On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria <mkathu...@sprinklr.com>
wrote:

> Dear Experts,
>
> We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
> topic with 30 partitions and 2 replicas. We are using High level consumer
> api.
> Each consumer process which is a storm topolofy has 5 streams which
> connects to 1 or more partitions. We are not using storm's inbuilt kafka
> spout. Everything runs fine till the 5th consumer process(25 streams) is
> added for this topic.
>
> As soon as the sixth consumer process is added, the newly added partition
> does not get the ownership of the partitions that it requests for as the
> already existing owners have not yet given up the ownership.
>
> We changed certain properties on consumer :
>
> 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
> rebalance.max.retries >> zk connection timeout)
> 2. Back off ms between rebalances - 10000 (10seconds)
> 3. ZK connection timeout - 100,000 (100 seconds)
>
> Although when I am looking in the zookeeper shell when the rebalance is
> happening, the consumer is registered fine on the zookeeper. Just that the
> rebalance does not happen.
> After the 20th rebalance gets completed, we get
>
>
> *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
> [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
> offsets after clearing the fetcher queues*
> *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
> exception while trying to start streamer threads:
> rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after
> 20 retries*
> *kafka.common.ConsumerRebalanceFailedException:
> rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after
> 20 retries*
> *        at
>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
> ~[stormjar.jar:na]*
> *        at
>
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
> ~[stormjar.jar:na]*
> *        at
>
> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
> ~[stormjar.jar:na]*
> *        at
>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
> ~[stormjar.jar:na]*
> *        at
>
> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
> ~[stormjar.jar:na]*
> *        at
>
> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
> ~[stormjar.jar:na]*
> *        at
>
> com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
> [stormjar.jar:na]*
> *        at
>
> com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
> [stormjar.jar:na]*
> *        at
>
> com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
> [stormjar.jar:na]*
> *        at
>
> com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
> [stormjar.jar:na]*
> *        at
>
> backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
> [na:0.9.1-incubating]*
> *        at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
> [na:0.9.1-incubating]*
> *        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]*
> *        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]*
> *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
> [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin registering
> consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK*
> *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in
>
> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> data:
>
> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}
> stored data:
>
> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}*
> *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
> node
>
> [{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}]
> at
>
> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> a while back in a different session, hence I will backoff for this node to
> be deleted by Zookeeper and retry*
>
> Due to this error, none of the consumer consumes from these partitions in
> contention which creates a sort of skewed lag on kafka side.  When the 6th
> consumer was added, the existing owner process of the partitions in
> question did not get rebalanced.
>
> Any help would be highly appreciated.
>
> -Thanks,
> Mohit
>

Reply via email to