Mike, The endless rebalance errors occur due the error that Mayuresh just pasted. The rebalance attempts fail because of the conflict in the zkNode.
Below is the exact trace. *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{"version":1,"subscription":{"______":5},"pattern":"static","timestamp":"1417964160024"}] at /consumers/______ids/**______**-1417963753598-b19de58d a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry* Would be more than willing to help. Let me know if you need any more information and reach out to me in person and we can work on it. -Thanks, Mohit On Fri, Mar 27, 2015 at 3:00 AM, Mike Axiak <m...@axiak.net> wrote: > No, we don't normally see conflicts. We'll see endless attempts to > rebalance. > > -Mike > > On Thu, Mar 26, 2015 at 5:15 PM, Mayuresh Gharat < > gharatmayures...@gmail.com > > wrote: > > > Did you see something like this in any of the consumer logs : > > > > "Conflict in ….. data : ……. stored data :……” ? > > > > Thanks, > > > > Mayuresh > > > > On Thu, Mar 26, 2015 at 1:50 PM, Mike Axiak <m...@axiak.net> wrote: > > > > > Hi guys, > > > > > > At HubSpot we think the issue is related to slow consumers. During a > > > rebalance, one of the first things the consumer does is signal a > shutdown > > > to the fetcher [1] [2], in order to relinquish ownership of the > > partitions. > > > > > > This waits for the shutdown of all shutdown fetcher threads, which can > > only > > > happen until the thread's "enqueue current chunk" command finishes. If > > you > > > have a slow consumer or large chunk sizes, this could take a while > which > > > would make it difficult for the rebalance to actually occur > successfully. > > > > > > We're testing out different solutions now. Currently under review, > we're > > > thinking about making the enqueue into the blocking queue timeout so we > > can > > > check to see if we're running, to end the process of the current chunk > > > early. > > > > > > Has anyone else noticed this? If so, are there any patches people have > > > written. Once we have a clearer picture of solutions we'll send a few > > > patches in JIRAs. > > > > > > Best, > > > Mike > > > > > > 1: > > > > > > > > > https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L655 > > > 2: > > > > > > > > > https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L712 > > > > > > > > > On Mon, Dec 22, 2014 at 6:51 PM, Neha Narkhede <n...@confluent.io> > > wrote: > > > > > > > Can you share a reproducible test case? > > > > > > > > On Tue, Dec 9, 2014 at 7:11 AM, Mohit Kathuria < > mkathu...@sprinklr.com > > > > > > > wrote: > > > > > > > > > Neha, > > > > > > > > > > The same issue reoccured with just 2 consumer processes. The > > exception > > > > was > > > > > related to conflict in writing the ephemeral node. Below was the > > > > exception. > > > > > Topic name is > > > > > > > > > > > > > > > "lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin" > > > > > with 30 partitions. The 2 processes were running on 2 servers with > > ips > > > > > 10.0.8.222 and 10.0.8.225. > > > > > > > > > > *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted > > > > ephemeral > > > > > node > > > > > > > > > > > > > > > [{"version":1,"subscription":{"lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin":5},"pattern":"static","timestamp":"1417964160024"}] > > > > > at > > > > > > > > > > > > > > > /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d > > > > > a while back in a different session, hence I will backoff for this > > node > > > > to > > > > > be deleted by Zookeeper and retry* > > > > > Attached the complete error logs. The exception occured after the > > > > > rebalance failed even after 40 retries. Rebalance failed as the > > process > > > > > already owning some of the partitions did not give us ownership due > > to > > > > > conflicting ephemeral nodes. As you suggested, we ran the wchp > > command > > > > on > > > > > the 3 zookeeper nodes at this time and figured out that the watcher > > was > > > > > registered for only one of the process. I am copying the kafka > > consumer > > > > > watcher registered on one of the zookeeper servers. (Attached are > the > > > > wchp > > > > > outputs of all 3 zk servers) > > > > > > > > > > *$echo wchp | nc localhost 2181 * > > > > > > > > > > > > > > > > > > > > > > > > > */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids* > > > > > > > > > > * 0x34a175e1d5d0130* > > > > > > > > > > > > > > > "0x34a175e1d5d0130" was the ephemeral node session Id. I went back > to > > > the > > > > > zookeeper shell and checked the consumers registered for this topic > > and > > > > > consumer group(same as topic name). Attaching the output in > > > > zkCommands.txt. > > > > > This clearly shows that > > > > > > > > > > 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130 > > > > > > > > > > 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127 > > > > > > > > > > > > > > > I think we have the issue here that both consumers have written to > > > > > different ephemeral nodes. Watchers are registered for the one of > > the 2 > > > > > ephemeral node. The root cause seems to be the inconsistent state > > while > > > > > writing the ephemeral nodes in ZK. > > > > > > > > > > Let me know if you need more details. > > > > > > > > > > -Thanks, > > > > > > > > > > Mohit > > > > > > > > > > > > > > > > > > > > > > > > > On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede < > > > neha.narkh...@gmail.com> > > > > > wrote: > > > > > > > > > >> A rebalance should trigger on all consumers when you add a new > > > consumer > > > > to > > > > >> the group. If you don't see the zookeeper watch fire, the consumer > > may > > > > >> have > > > > >> somehow lost the watch. We have seen this behavior on older zk > > > > versions, I > > > > >> wonder it that bug got reintroduced. To verify if this is the > case, > > > you > > > > >> can > > > > >> run the wchp zookeeper command on the zk leader and check if each > > > > consumer > > > > >> has a watch registered. > > > > >> > > > > >> Do you have a way to try this on zk 3.3.4? I would recommend you > try > > > the > > > > >> wchp suggestion as well. > > > > >> > > > > >> On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria < > > > mkathu...@sprinklr.com> > > > > >> wrote: > > > > >> > > > > >> > Hi all, > > > > >> > > > > > >> > Can someone help here. We are getting constant rebalance failure > > > each > > > > >> time > > > > >> > a consumer is added beyond a certain number. Did quite a lot of > > > > >> debugging > > > > >> > on this and still not able to figure out the pattern. > > > > >> > > > > > >> > -Thanks, > > > > >> > Mohit > > > > >> > > > > > >> > On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria < > > > > mkathu...@sprinklr.com > > > > >> > > > > > >> > wrote: > > > > >> > > > > > >> > > Neha, > > > > >> > > > > > > >> > > Looks like an issue with the consumer rebalance not able to > > > complete > > > > >> > > successfully. We were able to reproduce the issue on topic > with > > 30 > > > > >> > > partitions, 3 consumer processes(p1,p2 and p3), properties - > > 40 > > > > >> > > rebalance.max.retries and 10000(10s) rebalance.backoff.ms. > > > > >> > > > > > > >> > > Before the process p3 was started, partition ownership was as > > > > >> expected: > > > > >> > > > > > > >> > > partitions 0-14 owned by p1 > > > > >> > > partitions 15-29 -> owner p2 > > > > >> > > > > > > >> > > As the process p3 started, rebalance was triggered. Process p3 > > was > > > > >> > > successfully able to acquire partition ownership for > partitions > > > > 20-29 > > > > >> as > > > > >> > > expected as per the rebalance algorithm. However, process p2 > > while > > > > >> trying > > > > >> > > to acquire ownership of partitions 10-19 saw rebalance failure > > > after > > > > >> 40 > > > > >> > > retries. > > > > >> > > > > > > >> > > Attaching the logs from process p2 and process p1. It says > that > > p2 > > > > was > > > > >> > > attempting to rebalance, it was trying to acquire ownership of > > > > >> partitions > > > > >> > > 10-14 which were owned by process p1. However, at the same > time > > > > >> process > > > > >> > p1 > > > > >> > > did not get any event for giving up the partition ownership > for > > > > >> > partitions > > > > >> > > 1-14. > > > > >> > > We were expecting a rebalance to have triggered in p1 - but it > > > > didn't > > > > >> and > > > > >> > > hence not giving up ownership. Is our assumption > > > correct/incorrect? > > > > >> > > And if the rebalance gets triggered in p1 - how to figure out > > > apart > > > > >> from > > > > >> > > logs as the logs on p1 did not have anything. > > > > >> > > > > > > >> > > *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO] > > > > >> > > [topic_consumerIdString], waiting for the partition ownership > to > > > be > > > > >> > > deleted: 11* > > > > >> > > > > > > >> > > During and after the rebalance failed on process p2, Partition > > > > >> Ownership > > > > >> > > was as below: > > > > >> > > 0-14 -> owner p1 > > > > >> > > 15-19 -> none > > > > >> > > 20-29 -> owner p3 > > > > >> > > > > > > >> > > This left the consumers in inconsistent state as 5 partitions > > were > > > > >> never > > > > >> > > consumer from and neither was the partitions ownership > balanced. > > > > >> > > > > > > >> > > However, there was no conflict in creating the ephemeral node > > > which > > > > >> was > > > > >> > > the case last time. Just to note that the ephemeral node > > conflict > > > > >> which > > > > >> > we > > > > >> > > were seeing earlier also appeared after rebalance failed. My > > hunch > > > > is > > > > >> > that > > > > >> > > fixing the rebalance failure will fix that issue as well. > > > > >> > > > > > > >> > > -Thanks, > > > > >> > > Mohit > > > > >> > > > > > > >> > > > > > > >> > > > > > > >> > > On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede < > > > > >> neha.narkh...@gmail.com> > > > > >> > > wrote: > > > > >> > > > > > > >> > >> Mohit, > > > > >> > >> > > > > >> > >> I wonder if it is related to > > > > >> > >> https://issues.apache.org/jira/browse/KAFKA-1585. When > > zookeeper > > > > >> > expires > > > > >> > >> a > > > > >> > >> session, it doesn't delete the ephemeral nodes immediately. > So > > if > > > > you > > > > >> > end > > > > >> > >> up trying to recreate ephemeral nodes quickly, it could > either > > be > > > > in > > > > >> the > > > > >> > >> valid latest session or from the previously expired session. > If > > > you > > > > >> hit > > > > >> > >> this problem, then waiting would resolve it. But if not, then > > > this > > > > >> may > > > > >> > be > > > > >> > >> a > > > > >> > >> legitimate bug in ZK 3.4.6. > > > > >> > >> > > > > >> > >> Can you try shutting down all your consumers, waiting until > > > session > > > > >> > >> timeout > > > > >> > >> and restarting them? > > > > >> > >> > > > > >> > >> Thanks, > > > > >> > >> Neha > > > > >> > >> > > > > >> > >> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria < > > > > >> mkathu...@sprinklr.com > > > > >> > > > > > > >> > >> wrote: > > > > >> > >> > > > > >> > >> > Dear Experts, > > > > >> > >> > > > > > >> > >> > We recently updated to kafka v0.8.1.1 with zookeeper > v3.4.5. > > I > > > > >> have of > > > > >> > >> > topic with 30 partitions and 2 replicas. We are using High > > > level > > > > >> > >> consumer > > > > >> > >> > api. > > > > >> > >> > Each consumer process which is a storm topolofy has 5 > streams > > > > which > > > > >> > >> > connects to 1 or more partitions. We are not using storm's > > > > inbuilt > > > > >> > kafka > > > > >> > >> > spout. Everything runs fine till the 5th consumer > process(25 > > > > >> streams) > > > > >> > is > > > > >> > >> > added for this topic. > > > > >> > >> > > > > > >> > >> > As soon as the sixth consumer process is added, the newly > > added > > > > >> > >> partition > > > > >> > >> > does not get the ownership of the partitions that it > requests > > > for > > > > >> as > > > > >> > the > > > > >> > >> > already existing owners have not yet given up the > ownership. > > > > >> > >> > > > > > >> > >> > We changed certain properties on consumer : > > > > >> > >> > > > > > >> > >> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms * > > > > >> > >> > rebalance.max.retries >> zk connection timeout) > > > > >> > >> > 2. Back off ms between rebalances - 10000 (10seconds) > > > > >> > >> > 3. ZK connection timeout - 100,000 (100 seconds) > > > > >> > >> > > > > > >> > >> > Although when I am looking in the zookeeper shell when the > > > > >> rebalance > > > > >> > is > > > > >> > >> > happening, the consumer is registered fine on the > zookeeper. > > > Just > > > > >> that > > > > >> > >> the > > > > >> > >> > rebalance does not happen. > > > > >> > >> > After the 20th rebalance gets completed, we get > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO] > > > > >> > >> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], > > > > Committing > > > > >> > all > > > > >> > >> > offsets after clearing the fetcher queues* > > > > >> > >> > *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] > > > > Ignoring > > > > >> > >> > exception while trying to start streamer threads: > > > > >> > >> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't > > > > >> rebalance > > > > >> > >> after > > > > >> > >> > 20 retries* > > > > >> > >> > *kafka.common.ConsumerRebalanceFailedException: > > > > >> > >> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't > > > > >> rebalance > > > > >> > >> after > > > > >> > >> > 20 retries* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) > > > > >> > >> > ~[stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) > > > > >> > >> > ~[stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) > > > > >> > >> > ~[stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80) > > > > >> > >> > ~[stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79) > > > > >> > >> > ~[stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64) > > > > >> > >> > ~[stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71) > > > > >> > >> > [stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48) > > > > >> > >> > [stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63) > > > > >> > >> > [stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121) > > > > >> > >> > [stormjar.jar:na]* > > > > >> > >> > * at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562) > > > > >> > >> > [na:0.9.1-incubating]* > > > > >> > >> > * at > > > > >> > backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) > > > > >> > >> > [na:0.9.1-incubating]* > > > > >> > >> > * at clojure.lang.AFn.run(AFn.java:24) > > > > >> [clojure-1.4.0.jar:na]* > > > > >> > >> > * at java.lang.Thread.run(Thread.java:745) > > > [na:1.7.0_55]* > > > > >> > >> > *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO] > > > > >> > >> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], > > begin > > > > >> > >> registering > > > > >> > >> > consumer > > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b > > > in > > > > >> ZK* > > > > >> > >> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b > > > > >> > >> > data: > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"} > > > > >> > >> > stored data: > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}* > > > > >> > >> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this > > > conflicted > > > > >> > >> ephemeral > > > > >> > >> > node > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > [{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}] > > > > >> > >> > at > > > > >> > >> > > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > >> > > > > > > > > > > /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b > > > > >> > >> > a while back in a different session, hence I will backoff > for > > > > this > > > > >> > node > > > > >> > >> to > > > > >> > >> > be deleted by Zookeeper and retry* > > > > >> > >> > > > > > >> > >> > Due to this error, none of the consumer consumes from these > > > > >> partitions > > > > >> > >> in > > > > >> > >> > contention which creates a sort of skewed lag on kafka > side. > > > > When > > > > >> the > > > > >> > >> 6th > > > > >> > >> > consumer was added, the existing owner process of the > > > partitions > > > > in > > > > >> > >> > question did not get rebalanced. > > > > >> > >> > > > > > >> > >> > Any help would be highly appreciated. > > > > >> > >> > > > > > >> > >> > -Thanks, > > > > >> > >> > Mohit > > > > >> > >> > > > > > >> > >> > > > > >> > > > > > > >> > > > > > > >> > > > > > >> > > > > > > > > > > > > > > > > > > > > > > -- > > > > Thanks, > > > > Neha > > > > > > > > > > > > > > > -- > > -Regards, > > Mayuresh R. Gharat > > (862) 250-7125 > > >