Hi all,

Can someone help here. We are getting constant rebalance failure each time
a consumer is added beyond a certain number. Did quite a lot of debugging
on this and still not able to figure out the pattern.

-Thanks,
Mohit

On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria <mkathu...@sprinklr.com>
wrote:

> Neha,
>
> Looks like an issue with the consumer rebalance not able to complete
> successfully. We were able to reproduce the issue on topic with 30
> partitions,  3 consumer processes(p1,p2 and p3), properties -  40
> rebalance.max.retries and 10000(10s) rebalance.backoff.ms.
>
> Before the process p3 was started, partition ownership was as expected:
>
> partitions 0-14 owned by p1
> partitions 15-29 -> owner p2
>
> As the process p3 started, rebalance was triggered. Process p3 was
> successfully able to acquire partition ownership for partitions 20-29 as
> expected as per the rebalance algorithm. However, process p2 while trying
> to acquire ownership of partitions 10-19 saw rebalance failure after 40
> retries.
>
> Attaching the logs from process p2 and process p1. It says that p2 was
> attempting to rebalance, it was trying to acquire ownership of partitions
> 10-14 which were owned by process p1. However, at the same time process p1
> did not get any event for giving up the partition ownership for partitions
> 1-14.
> We were expecting a rebalance to have triggered in p1 - but it didn't and
> hence not giving up ownership. Is our assumption correct/incorrect?
> And if the rebalance gets triggered in p1 - how to figure out apart from
> logs as the logs on p1 did not have anything.
>
> *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
> [topic_consumerIdString], waiting for the partition ownership to be
> deleted: 11*
>
> During and after the rebalance failed on process p2, Partition Ownership
> was as below:
> 0-14 -> owner p1
> 15-19 -> none
> 20-29 -> owner p3
>
> This left the consumers in inconsistent state as 5 partitions were never
> consumer from and neither was the partitions ownership balanced.
>
> However, there was no conflict in creating the ephemeral node which was
> the case last time. Just to note that the ephemeral node conflict which we
> were seeing earlier also appeared after rebalance failed. My hunch is that
> fixing the rebalance failure will fix that issue as well.
>
> -Thanks,
> Mohit
>
>
>
> On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede <neha.narkh...@gmail.com>
> wrote:
>
>> Mohit,
>>
>> I wonder if it is related to
>> https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires
>> a
>> session, it doesn't delete the ephemeral nodes immediately. So if you end
>> up trying to recreate ephemeral nodes quickly, it could either be in the
>> valid latest session or from the previously expired session. If you hit
>> this problem, then waiting would resolve it. But if not, then this may be
>> a
>> legitimate bug in ZK 3.4.6.
>>
>> Can you try shutting down all your consumers, waiting until session
>> timeout
>> and restarting them?
>>
>> Thanks,
>> Neha
>>
>> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria <mkathu...@sprinklr.com>
>> wrote:
>>
>> > Dear Experts,
>> >
>> > We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
>> > topic with 30 partitions and 2 replicas. We are using High level
>> consumer
>> > api.
>> > Each consumer process which is a storm topolofy has 5 streams which
>> > connects to 1 or more partitions. We are not using storm's inbuilt kafka
>> > spout. Everything runs fine till the 5th consumer process(25 streams) is
>> > added for this topic.
>> >
>> > As soon as the sixth consumer process is added, the newly added
>> partition
>> > does not get the ownership of the partitions that it requests for as the
>> > already existing owners have not yet given up the ownership.
>> >
>> > We changed certain properties on consumer :
>> >
>> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
>> > rebalance.max.retries >> zk connection timeout)
>> > 2. Back off ms between rebalances - 10000 (10seconds)
>> > 3. ZK connection timeout - 100,000 (100 seconds)
>> >
>> > Although when I am looking in the zookeeper shell when the rebalance is
>> > happening, the consumer is registered fine on the zookeeper. Just that
>> the
>> > rebalance does not happen.
>> > After the 20th rebalance gets completed, we get
>> >
>> >
>> > *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
>> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
>> > offsets after clearing the fetcher queues*
>> > *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
>> > exception while trying to start streamer threads:
>> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
>> after
>> > 20 retries*
>> > *kafka.common.ConsumerRebalanceFailedException:
>> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
>> after
>> > 20 retries*
>> > *        at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
>> > ~[stormjar.jar:na]*
>> > *        at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
>> > ~[stormjar.jar:na]*
>> > *        at
>> >
>> >
>> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
>> > ~[stormjar.jar:na]*
>> > *        at
>> >
>> >
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
>> > ~[stormjar.jar:na]*
>> > *        at
>> >
>> >
>> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
>> > ~[stormjar.jar:na]*
>> > *        at
>> >
>> >
>> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
>> > ~[stormjar.jar:na]*
>> > *        at
>> >
>> >
>> com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
>> > [stormjar.jar:na]*
>> > *        at
>> >
>> >
>> com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
>> > [stormjar.jar:na]*
>> > *        at
>> >
>> >
>> com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
>> > [stormjar.jar:na]*
>> > *        at
>> >
>> >
>> com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
>> > [stormjar.jar:na]*
>> > *        at
>> >
>> >
>> backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
>> > [na:0.9.1-incubating]*
>> > *        at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
>> > [na:0.9.1-incubating]*
>> > *        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]*
>> > *        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]*
>> > *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
>> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin
>> registering
>> > consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK*
>> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in
>> >
>> >
>> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
>> > data:
>> >
>> >
>> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}
>> > stored data:
>> >
>> >
>> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}*
>> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted
>> ephemeral
>> > node
>> >
>> >
>> [{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}]
>> > at
>> >
>> >
>> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
>> > a while back in a different session, hence I will backoff for this node
>> to
>> > be deleted by Zookeeper and retry*
>> >
>> > Due to this error, none of the consumer consumes from these partitions
>> in
>> > contention which creates a sort of skewed lag on kafka side.  When the
>> 6th
>> > consumer was added, the existing owner process of the partitions in
>> > question did not get rebalanced.
>> >
>> > Any help would be highly appreciated.
>> >
>> > -Thanks,
>> > Mohit
>> >
>>
>
>

Reply via email to