Neha,

In my last reply, the subject got changed thats why it got marked as new
message on
http://mail-archives.apache.org/mod_mbox/kafka-users/201411.mbox/date.
Please ignore that. Below text is the reply in continuation to
http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAALehC3X2i7+n8aaEkXtxBduwUUp6Zk7=-jxn8yrdcjy1f...@mail.gmail.com%3E


Looks like an issue with the consumer rebalance not able to complete
successfully. We were able to reproduce the issue on topic with 30
partitions,  3 consumer processes(p1,p2 and p3), properties -  40
rebalance.max.retries and 10000(10s) rebalance.backoff.ms.

Before the process p3 was started, partition ownership was as expected:

partitions 0-14 owned by p1
partitions 15-29 -> owner p2

As the process p3 started, rebalance was triggered. Process p3 was
successfully able to acquire partition ownership for partitions 20-29 as
expected as per the rebalance algorithm. However, process p2 while trying
to acquire ownership of partitions 10-19 saw rebalance failure after 40
retries.

Attaching the logs from process p2 and process p1. It says that p2 was
attempting to rebalance, it was trying to acquire ownership of partitions
10-14 which were owned by process p1. However, at the same time process p1
did not get any event for giving up the partition ownership for partitions
1-14.
We were expecting a rebalance to have triggered in p1 - but it didn't and
hence not giving up ownership. Is our assumption correct/incorrect?
And if the rebalance gets triggered in p1 - how to figure out apart from
logs as the logs on p1 did not have anything.

*2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
[topic_consumerIdString], waiting for the partition ownership to be
deleted: 11*

During and after the rebalance failed on process p2, Partition Ownership
was as below:
0-14 -> owner p1
15-19 -> none
20-29 -> owner p3

This left the consumers in inconsistent state as 5 partitions were never
consumer from and neither was the partitions ownership balanced.

However, there was no conflict in creating the ephemeral node which was the
case last time. Just to note that the ephemeral node conflict which we were
seeing earlier also appeared after rebalance failed. My hunch is that
fixing the rebalance failure will fix that issue as well.

-Thanks,
Mohit



On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede <neha.narkh...@gmail.com>
wrote:

> Mohit,
>
> I wonder if it is related to
> https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires a
> session, it doesn't delete the ephemeral nodes immediately. So if you end
> up trying to recreate ephemeral nodes quickly, it could either be in the
> valid latest session or from the previously expired session. If you hit
> this problem, then waiting would resolve it. But if not, then this may be a
> legitimate bug in ZK 3.4.6.
>
> Can you try shutting down all your consumers, waiting until session timeout
> and restarting them?
>
> Thanks,
> Neha
>
> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria <mkathu...@sprinklr.com>
> wrote:
>
> > Dear Experts,
> >
> > We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
> > topic with 30 partitions and 2 replicas. We are using High level consumer
> > api.
> > Each consumer process which is a storm topolofy has 5 streams which
> > connects to 1 or more partitions. We are not using storm's inbuilt kafka
> > spout. Everything runs fine till the 5th consumer process(25 streams) is
> > added for this topic.
> >
> > As soon as the sixth consumer process is added, the newly added partition
> > does not get the ownership of the partitions that it requests for as the
> > already existing owners have not yet given up the ownership.
> >
> > We changed certain properties on consumer :
> >
> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
> > rebalance.max.retries >> zk connection timeout)
> > 2. Back off ms between rebalances - 10000 (10seconds)
> > 3. ZK connection timeout - 100,000 (100 seconds)
> >
> > Although when I am looking in the zookeeper shell when the rebalance is
> > happening, the consumer is registered fine on the zookeeper. Just that
> the
> > rebalance does not happen.
> > After the 20th rebalance gets completed, we get
> >
> >
> > *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
> > offsets after clearing the fetcher queues*
> > *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
> > exception while trying to start streamer threads:
> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
> after
> > 20 retries*
> > *kafka.common.ConsumerRebalanceFailedException:
> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
> after
> > 20 retries*
> > *        at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
> > ~[stormjar.jar:na]*
> > *        at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
> > ~[stormjar.jar:na]*
> > *        at
> >
> >
> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
> > ~[stormjar.jar:na]*
> > *        at
> >
> >
> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
> > ~[stormjar.jar:na]*
> > *        at
> >
> >
> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
> > ~[stormjar.jar:na]*
> > *        at
> >
> >
> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
> > ~[stormjar.jar:na]*
> > *        at
> >
> >
> com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
> > [stormjar.jar:na]*
> > *        at
> >
> >
> com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
> > [stormjar.jar:na]*
> > *        at
> >
> >
> com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
> > [stormjar.jar:na]*
> > *        at
> >
> >
> com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
> > [stormjar.jar:na]*
> > *        at
> >
> >
> backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
> > [na:0.9.1-incubating]*
> > *        at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
> > [na:0.9.1-incubating]*
> > *        at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]*
> > *        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]*
> > *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin
> registering
> > consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK*
> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in
> >
> >
> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> > data:
> >
> >
> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}
> > stored data:
> >
> >
> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}*
> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted
> ephemeral
> > node
> >
> >
> [{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}]
> > at
> >
> >
> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
> > a while back in a different session, hence I will backoff for this node
> to
> > be deleted by Zookeeper and retry*
> >
> > Due to this error, none of the consumer consumes from these partitions in
> > contention which creates a sort of skewed lag on kafka side.  When the
> 6th
> > consumer was added, the existing owner process of the partitions in
> > question did not get rebalanced.
> >
> > Any help would be highly appreciated.
> >
> > -Thanks,
> > Mohit
> >
>

Reply via email to