Can you share a reproducible test case?

On Tue, Dec 9, 2014 at 7:11 AM, Mohit Kathuria <mkathu...@sprinklr.com>
wrote:

> Neha,
>
> The same issue reoccured with just 2 consumer processes. The exception was
> related to conflict in writing the ephemeral node. Below was the exception.
> Topic name is
>  "lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin"
> with 30 partitions. The 2 processes were running on 2 servers with ips
> 10.0.8.222 and 10.0.8.225.
>
> *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
> node
> [{"version":1,"subscription":{"lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin":5},"pattern":"static","timestamp":"1417964160024"}]
> at
> /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d
> a while back in a different session, hence I will backoff for this node to
> be deleted by Zookeeper and retry*
> Attached the complete error logs. The exception occured after the
> rebalance failed even after 40 retries. Rebalance failed as the process
> already owning some of the partitions did not give us ownership due to
> conflicting ephemeral nodes. As you suggested, we ran the wchp command  on
> the 3 zookeeper nodes at this time and figured out that the watcher was
> registered for only one of the process. I am copying the kafka consumer
> watcher registered on one of the zookeeper servers. (Attached are the wchp
> outputs of all 3 zk servers)
>
> *$echo wchp | nc localhost 2181 *
>
>
> */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids*
>
> * 0x34a175e1d5d0130*
>
>
> "0x34a175e1d5d0130" was the ephemeral node session Id. I went back to the
> zookeeper shell and checked the consumers registered for this topic and
> consumer group(same as topic name). Attaching the output in zkCommands.txt.
> This clearly shows that
>
> 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130
>
> 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127
>
>
> I think we have the issue here that both consumers have written to
> different ephemeral nodes. Watchers are registered for the one of the 2
> ephemeral node. The root cause seems to be the inconsistent state while
> writing the ephemeral nodes in ZK.
>
> Let me know if you need more details.
>
> -Thanks,
>
> Mohit
>
>
>
>
> On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede <neha.narkh...@gmail.com>
> wrote:
>
>> A rebalance should trigger on all consumers when you add a new consumer to
>> the group. If you don't see the zookeeper watch fire, the consumer may
>> have
>> somehow lost the watch. We have seen this behavior on older zk versions, I
>> wonder it that bug got reintroduced. To verify if this is the case, you
>> can
>> run the wchp zookeeper command on the zk leader and check if each consumer
>> has a watch registered.
>>
>> Do you have a way to try this on zk 3.3.4? I would recommend you try the
>> wchp suggestion as well.
>>
>> On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria <mkathu...@sprinklr.com>
>> wrote:
>>
>> > Hi all,
>> >
>> > Can someone help here. We are getting constant rebalance failure each
>> time
>> > a consumer is added beyond a certain number. Did quite a lot of
>> debugging
>> > on this and still not able to figure out the pattern.
>> >
>> > -Thanks,
>> > Mohit
>> >
>> > On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria <mkathu...@sprinklr.com
>> >
>> > wrote:
>> >
>> > > Neha,
>> > >
>> > > Looks like an issue with the consumer rebalance not able to complete
>> > > successfully. We were able to reproduce the issue on topic with 30
>> > > partitions,  3 consumer processes(p1,p2 and p3), properties -  40
>> > > rebalance.max.retries and 10000(10s) rebalance.backoff.ms.
>> > >
>> > > Before the process p3 was started, partition ownership was as
>> expected:
>> > >
>> > > partitions 0-14 owned by p1
>> > > partitions 15-29 -> owner p2
>> > >
>> > > As the process p3 started, rebalance was triggered. Process p3 was
>> > > successfully able to acquire partition ownership for partitions 20-29
>> as
>> > > expected as per the rebalance algorithm. However, process p2 while
>> trying
>> > > to acquire ownership of partitions 10-19 saw rebalance failure after
>> 40
>> > > retries.
>> > >
>> > > Attaching the logs from process p2 and process p1. It says that p2 was
>> > > attempting to rebalance, it was trying to acquire ownership of
>> partitions
>> > > 10-14 which were owned by process p1. However, at the same time
>> process
>> > p1
>> > > did not get any event for giving up the partition ownership for
>> > partitions
>> > > 1-14.
>> > > We were expecting a rebalance to have triggered in p1 - but it didn't
>> and
>> > > hence not giving up ownership. Is our assumption correct/incorrect?
>> > > And if the rebalance gets triggered in p1 - how to figure out apart
>> from
>> > > logs as the logs on p1 did not have anything.
>> > >
>> > > *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
>> > > [topic_consumerIdString], waiting for the partition ownership to be
>> > > deleted: 11*
>> > >
>> > > During and after the rebalance failed on process p2, Partition
>> Ownership
>> > > was as below:
>> > > 0-14 -> owner p1
>> > > 15-19 -> none
>> > > 20-29 -> owner p3
>> > >
>> > > This left the consumers in inconsistent state as 5 partitions were
>> never
>> > > consumer from and neither was the partitions ownership balanced.
>> > >
>> > > However, there was no conflict in creating the ephemeral node which
>> was
>> > > the case last time. Just to note that the ephemeral node conflict
>> which
>> > we
>> > > were seeing earlier also appeared after rebalance failed. My hunch is
>> > that
>> > > fixing the rebalance failure will fix that issue as well.
>> > >
>> > > -Thanks,
>> > > Mohit
>> > >
>> > >
>> > >
>> > > On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede <
>> neha.narkh...@gmail.com>
>> > > wrote:
>> > >
>> > >> Mohit,
>> > >>
>> > >> I wonder if it is related to
>> > >> https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper
>> > expires
>> > >> a
>> > >> session, it doesn't delete the ephemeral nodes immediately. So if you
>> > end
>> > >> up trying to recreate ephemeral nodes quickly, it could either be in
>> the
>> > >> valid latest session or from the previously expired session. If you
>> hit
>> > >> this problem, then waiting would resolve it. But if not, then this
>> may
>> > be
>> > >> a
>> > >> legitimate bug in ZK 3.4.6.
>> > >>
>> > >> Can you try shutting down all your consumers, waiting until session
>> > >> timeout
>> > >> and restarting them?
>> > >>
>> > >> Thanks,
>> > >> Neha
>> > >>
>> > >> On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria <
>> mkathu...@sprinklr.com
>> > >
>> > >> wrote:
>> > >>
>> > >> > Dear Experts,
>> > >> >
>> > >> > We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I
>> have of
>> > >> > topic with 30 partitions and 2 replicas. We are using High level
>> > >> consumer
>> > >> > api.
>> > >> > Each consumer process which is a storm topolofy has 5 streams which
>> > >> > connects to 1 or more partitions. We are not using storm's inbuilt
>> > kafka
>> > >> > spout. Everything runs fine till the 5th consumer process(25
>> streams)
>> > is
>> > >> > added for this topic.
>> > >> >
>> > >> > As soon as the sixth consumer process is added, the newly added
>> > >> partition
>> > >> > does not get the ownership of the partitions that it requests for
>> as
>> > the
>> > >> > already existing owners have not yet given up the ownership.
>> > >> >
>> > >> > We changed certain properties on consumer :
>> > >> >
>> > >> > 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
>> > >> > rebalance.max.retries >> zk connection timeout)
>> > >> > 2. Back off ms between rebalances - 10000 (10seconds)
>> > >> > 3. ZK connection timeout - 100,000 (100 seconds)
>> > >> >
>> > >> > Although when I am looking in the zookeeper shell when the
>> rebalance
>> > is
>> > >> > happening, the consumer is registered fine on the zookeeper. Just
>> that
>> > >> the
>> > >> > rebalance does not happen.
>> > >> > After the 20th rebalance gets completed, we get
>> > >> >
>> > >> >
>> > >> > *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
>> > >> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing
>> > all
>> > >> > offsets after clearing the fetcher queues*
>> > >> > *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
>> > >> > exception while trying to start streamer threads:
>> > >> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't
>> rebalance
>> > >> after
>> > >> > 20 retries*
>> > >> > *kafka.common.ConsumerRebalanceFailedException:
>> > >> > rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't
>> rebalance
>> > >> after
>> > >> > 20 retries*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
>> > >> > ~[stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
>> > >> > ~[stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
>> > >> > ~[stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
>> > >> > ~[stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
>> > >> > ~[stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
>> > >> > ~[stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
>> > >> > [stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
>> > >> > [stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
>> > >> > [stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
>> > >> > [stormjar.jar:na]*
>> > >> > *        at
>> > >> >
>> > >> >
>> > >>
>> >
>> backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
>> > >> > [na:0.9.1-incubating]*
>> > >> > *        at
>> > backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
>> > >> > [na:0.9.1-incubating]*
>> > >> > *        at clojure.lang.AFn.run(AFn.java:24)
>> [clojure-1.4.0.jar:na]*
>> > >> > *        at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]*
>> > >> > *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
>> > >> > [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin
>> > >> registering
>> > >> > consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in
>> ZK*
>> > >> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in
>> > >> >
>> > >> >
>> > >>
>> >
>> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
>> > >> > data:
>> > >> >
>> > >> >
>> > >>
>> >
>> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}
>> > >> > stored data:
>> > >> >
>> > >> >
>> > >>
>> >
>> {"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025767483"}*
>> > >> > *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted
>> > >> ephemeral
>> > >> > node
>> > >> >
>> > >> >
>> > >>
>> >
>> [{"version":1,"subscription":{"rule-engine-feed":5},"pattern":"static","timestamp":"1413025810635"}]
>> > >> > at
>> > >> >
>> > >> >
>> > >>
>> >
>> /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
>> > >> > a while back in a different session, hence I will backoff for this
>> > node
>> > >> to
>> > >> > be deleted by Zookeeper and retry*
>> > >> >
>> > >> > Due to this error, none of the consumer consumes from these
>> partitions
>> > >> in
>> > >> > contention which creates a sort of skewed lag on kafka side.  When
>> the
>> > >> 6th
>> > >> > consumer was added, the existing owner process of the partitions in
>> > >> > question did not get rebalanced.
>> > >> >
>> > >> > Any help would be highly appreciated.
>> > >> >
>> > >> > -Thanks,
>> > >> > Mohit
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>


-- 
Thanks,
Neha

Reply via email to