Hi guys,

At HubSpot we think the issue is related to slow consumers. During a
rebalance, one of the first things the consumer does is signal a shutdown
to the fetcher [1] [2], in order to relinquish ownership of the partitions.

This waits for the shutdown of all shutdown fetcher threads, which can only
happen until the thread's "enqueue current chunk" command finishes. If you
have a slow consumer or large chunk sizes, this could take a while which
would make it difficult for the rebalance to actually occur successfully.

We're testing out different solutions now. Currently under review, we're
thinking about making the enqueue into the blocking queue timeout so we can
check to see if we're running, to end the process of the current chunk
early.

Has anyone else noticed this? If so, are there any patches people have
written. Once we have a clearer picture of solutions we'll send a few
patches in JIRAs.

Best,
Mike

1:
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L655
2:
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L712


On Mon, Dec 22, 2014 at 6:51 PM, Neha Narkhede <n...@confluent.io> wrote:

> Can you share a reproducible test case?
>
> On Tue, Dec 9, 2014 at 7:11 AM, Mohit Kathuria <mkathu...@sprinklr.com>
> wrote:
>
> > Neha,
> >
> > The same issue reoccured with just 2 consumer processes. The exception
> was
> > related to conflict in writing the ephemeral node. Below was the
> exception.
> > Topic name is
> >
> "lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin"
> > with 30 partitions. The 2 processes were running on 2 servers with ips
> > 10.0.8.222 and 10.0.8.225.
> >
> > *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted
> ephemeral
> > node
> >
> [{"version":1,"subscription":{"lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin":5},"pattern":"static","timestamp":"1417964160024"}]
> > at
> >
> /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d
> > a while back in a different session, hence I will backoff for this node
> to
> > be deleted by Zookeeper and retry*
> > Attached the complete error logs. The exception occured after the
> > rebalance failed even after 40 retries. Rebalance failed as the process
> > already owning some of the partitions did not give us ownership due to
> > conflicting ephemeral nodes. As you suggested, we ran the wchp command
> on
> > the 3 zookeeper nodes at this time and figured out that the watcher was
> > registered for only one of the process. I am copying the kafka consumer
> > watcher registered on one of the zookeeper servers. (Attached are the
> wchp
> > outputs of all 3 zk servers)
> >
> > *$echo wchp | nc localhost 2181 *
> >
> >
> >
> */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids*
> >
> > * 0x34a175e1d5d0130*
> >
> >
> > "0x34a175e1d5d0130" was the ephemeral node session Id. I went back to the
> > zookeeper shell and checked the consumers registered for this topic and
> > consumer group(same as topic name). Attaching the output in
> zkCommands.txt.
> > This clearly shows that
> >
> > 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130
> >
> > 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127
> >
> >
> > I think we have the issue here that both consumers have written to
> > different ephemeral nodes. Watchers are registered for the one of the 2
> > ephemeral node. The root cause seems to be the inconsistent state while
> > writing the ephemeral nodes in ZK.
> >
> > Let me know if you need more details.
> >
> > -Thanks,
> >
> > Mohit
> >
> >
> >
> >
> > On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede <neha.narkh...@gmail.com>
> > wrote:
> >
> >> A rebalance should trigger on all consumers when you add a new consumer
> to
> >> the group. If you don't see the zookeeper watch fire, the consumer may
> >> have
> >> somehow lost the watch. We have seen this behavior on older zk
> versions, I
> >> wonder it that bug got reintroduced. To verify if this is the case, you
> >> can
> >> run the wchp zookeeper command on the zk leader and check if each
> consumer
> >> has a watch registered.
> >>
> >> Do you have a way to try this on zk 3.3.4? I would recommend you try the
> >> wchp suggestion as well.
> >>
> >> On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria <mkathu...@sprinklr.com>
> >> wrote:
> >>
> >> > Hi all,
> >> >
> >> > Can someone help here. We are getting constant rebalance failure each
> >> time
> >> > a consumer is added beyond a certain number. Did quite a lot of
> >> debugging
> >> > on this and still not able to figure out the pattern.
> >> >
> >> > -Thanks,
> >> > Mohit
> >> >
> >> > On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria <
> mkathu...@sprinklr.com
> >> >
> >> > wrote:
> >> >
> >> > > Neha,
> >> > >
> >> > > Looks like an issue with the consumer rebalance not able to complete
> >> > > successfully. We were able to reproduce the issue on topic with 30
> >> > > partitions,  3 consumer processes(p1,p2 and p3), properties -  40
> >> > > rebalance.max.retries and 10000(10s) rebalance.backoff.ms.
> >> > >
> >> > > Before the process p3 was started, partition ownership was as
> >> expected:
> >> > >
> >> > > partitions 0-14 owned by p1
> >> > > partitions 15-29 -> owner p2
> >> > >
> >> > > As the process p3 started, rebalance was triggered. Process p3 was
> >> > > successfully able to acquire partition ownership for partitions
> 20-29
> >> as
> >> > > expected as per the rebalance algorithm. However, process p2 while
> >> trying
> >> > > to acquire ownership of partitions 10-19 saw rebalance failure after
> >> 40
> >> > > retries.
> >> > >
> >> > > Attaching the logs from process p2 and process p1. It says that p2
> was
> >> > > attempting to rebalance, it was trying to acquire ownership of
> >> partitions
> >> > > 10-14 which were owned by process p1. However, at the same time
> >> process
> >> > p1
> >> > > did not get any event for giving up the partition ownership for
> >> > partitions
> >> > > 1-14.
> >> > > We were expecting a rebalance to have triggered in p1 - but it
> didn't
> >> and
> >> > > hence not giving up ownership. Is our assumption correct/incorrect?
> >> > > And if the rebalance gets triggered in p1 - how to figure out apart
> >> from
> >> > > logs as the logs on p1 did not have anything.
> >> > >
> >> > > *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
> >> > > [topic_consumerIdString], waiting for the partition ownership to be
> >> > > deleted: 11*
> >> > >
> >> > > During and after the rebalance failed on process p2, Partition
> >> Ownership
> >> > > was as below:
> >> > > 0-14 -> owner p1
> >> > > 15-19 -> none
> >> > > 20-29 -> owner p3
> >> > >
> >> > > This left the consumers in inconsistent state as 5 partitions were
> >> never
> >> > > consumer from and neither was the partitions ownership balanced.
> >> > >
> >> > > However, there was no conflict in creating the ephemeral node which
> >> was
> >> > > the case last time. Just to note that the ephemeral node conflict
> >> which
> >> > we
> >> > > were seeing earlier also appeared after rebalance failed. My hunch
> is
> >> > that
> >> > > fixing the rebalance failure will fix that issue as well.
> >> > >
> >> > > -Thanks,
> >> > > Mohit
> >> > >
> >> > >
> >> > >
> >> > > On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede <
> >> neha.narkh...@gmail.com>
> >> > > wrote:
> >> > >
> >> > >> Mohit,
> >> > >>
> >> > >> I wonder if it is related to
> >> > >> https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper
> >> > expires
> >> > >> a
> >> > >> session, it doesn't delete the ephemeral nodes immediately. So if
> you
> >> > end
> >> > >> up trying to recreate ephemeral nodes quickly, it could either be
> in
> >> the
> >> > >> valid latest session or from the previously expired session. If you
> >> hit
> >> > >> this problem, then waiting would resolve it. But if not, then this
> >> may
> >> > be
> >> > >> a
> >> > >> legitimate bug in ZK 3.4.6.
> >> > >>
> >> > >> Can you try shutting down all your consumers, waiting until session
> >> > >> timeout
> >> > >> and restarting them?
> >> > >>
> >> > >> Thanks,
> >> > >> Neha
> >> > >>
> >> > >> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria <
> >> mkathu...@sprinklr.com
> >> > >
> >> > >> wrote:
> >> > >>
> >> > >> > Dear Experts,
> >> > >> >
> >> > >> > We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I
> >> have of
> >> > >> > topic with 30 partitions and 2 replicas. We are using High level
> >> > >> consumer
> >> > >> > api.
> >> > >> > Each consumer process which is a storm topolofy has 5 streams
> which
> >> > >> > connects to 1 or more partitions. We are not using storm's
> inbuilt
> >> > kafka
> >> > >> > spout. Everything runs fine till the 5th consumer process(25
> >> streams)
> >> > is
> >> > >> > added for this topic.
> >> > >> >
> >> > >> > As soon as the sixth consumer process is added, the newly added
> >> > >> partition
> >> > >> > does not get the ownership of the partitions that it requests for
> >> as
> >> > the
> >> > >> > already existing owners have not yet given up the ownership.
> >> > >> >
> >> > >> > We changed certain properties on consumer :
> >> > >> >
> >> > >> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
> >> > >> > rebalance.max.retries >> zk connection timeout)
> >> > >> > 2. Back off ms between rebalances - 10000 (10seconds)
> >> > >> > 3. ZK connection timeout - 100,000 (100 seconds)
> >> > >> >
> >> > >> > Although when I am looking in the zookeeper shell when the
> >> rebalance
> >> > is
> >> > >> > happening, the consumer is registered fine on the zookeeper. Just
> >> that
> >> > >> the
> >> > >> > rebalance does not happen.
> >> > >> > After the 20th rebalance gets completed, we get
> >> > >> >
> >> > >> >
> >> > >> > *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
> >> > >> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b],
> Committing
> >> > all
> >> > >> > offsets after clearing the fetcher queues*
> >> > >> > *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN]
> Ignoring
> >> > >> > exception while trying to start streamer threads:
> >> > >> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't
> >> rebalance
> >> > >> after
> >> > >> > 20 retries*
> >> > >> > *kafka.common.ConsumerRebalanceFailedException:
> >> > >> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't
> >> rebalance
> >> > >> after
> >> > >> > 20 retries*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
> >> > >> > ~[stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
> >> > >> > ~[stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
> >> > >> > ~[stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
> >> > >> > ~[stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
> >> > >> > ~[stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
> >> > >> > ~[stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
> >> > >> > [stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
> >> > >> > [stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
> >> > >> > [stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
> >> > >> > [stormjar.jar:na]*
> >> > >> > *        at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
> >> > >> > [na:0.9.1-incubating]*
> >> > >> > *        at
> >> > backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
> >> > >> > [na:0.9.1-incubating]*
> >> > >> > *        at clojure.lang.AFn.run(AFn.java:24)
> >> [clojure-1.4.0.jar:na]*
> >> > >> > *        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]*
> >> > >> > *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
> >> > >> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin
> >> > >> registering
> >> > >> > consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in
> >> ZK*
> >> > >> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> >> > >> > data:
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}
> >> > >> > stored data:
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}*
> >> > >> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted
> >> > >> ephemeral
> >> > >> > node
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> [{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}]
> >> > >> > at
> >> > >> >
> >> > >> >
> >> > >>
> >> >
> >>
> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> >> > >> > a while back in a different session, hence I will backoff for
> this
> >> > node
> >> > >> to
> >> > >> > be deleted by Zookeeper and retry*
> >> > >> >
> >> > >> > Due to this error, none of the consumer consumes from these
> >> partitions
> >> > >> in
> >> > >> > contention which creates a sort of skewed lag on kafka side.
> When
> >> the
> >> > >> 6th
> >> > >> > consumer was added, the existing owner process of the partitions
> in
> >> > >> > question did not get rebalanced.
> >> > >> >
> >> > >> > Any help would be highly appreciated.
> >> > >> >
> >> > >> > -Thanks,
> >> > >> > Mohit
> >> > >> >
> >> > >>
> >> > >
> >> > >
> >> >
> >>
> >
> >
>
>
> --
> Thanks,
> Neha
>

Reply via email to