Re: Does kafka write key to broker?

2015-07-02 Thread Mohit Kathuria
Guozhang,

Thanks for the reply. That's exactly what I was looking for.

-Thanks,
Mohit

On Tue, Jun 30, 2015 at 1:52 AM, Guozhang Wang wangg...@gmail.com wrote:

 Hi Mohit,

 The KeyedMessage has a key and partKey as it fields, and if the partKey
 is not specified then key will be used as partKey by default.

 So for your case, if you do

 new KeyedMessage[String, String](topic, null /*key*/, partkey, value)

 then the partkey will be used to determine the partition but not written to
 broker.

 Guozhang

 On Wed, Jun 24, 2015 at 9:45 AM, Mohit Kathuria mkathu...@sprinklr.com
 wrote:

  Thanks Jason.
 
  Hi Liquan,
 
  I was doing this initially using KeyedMessage without setting the key.
 But
  if the key is null, the custom partitioner won't be used and kafka will
  write to random partition which is cached for some time within the
  producer. I have a custom partitioner to have fairness between the
  partitions.
 
  -Thanks,
  Mohit
 
  On Tue, Jun 23, 2015 at 10:34 PM, Liquan Pei liquan...@gmail.com
 wrote:
 
   Hi Mohit,
  
   If you instantiate the keyed message with
   val topic = topic
   val value = value
   val message = new KeyedMessage[String, String](topic,  value);
  
   Then the key in the KeyedMessage will be null.
  
   Hope this helps!
  
   Thanks,
   Liquan
  
   On Tue, Jun 23, 2015 at 8:18 AM, Mohit Kathuria 
 mkathu...@sprinklr.com
   wrote:
  
Hi,
   
We are using kafka 0.8.1.1 in our production cluster. I recently
  started
specifying key as the message itself. I just realised that the key is
   also
written to the broker which means that the data is duplicated within
 a
keyed message. I am going to change the key. Stupid mistake.
   
However, just out of anxiety, I want to know whether we can turn off
writing the key to the broker. Any configuration I can change to
  achieve
this?
   
-Thanks,
Mohit Kathuria
   
  
  
  
   --
   Liquan Pei
   Department of Physics
   University of Massachusetts Amherst
  
 



 --
 -- Guozhang



Re: Does kafka write key to broker?

2015-06-24 Thread Mohit Kathuria
Thanks Jason.

Hi Liquan,

I was doing this initially using KeyedMessage without setting the key. But
if the key is null, the custom partitioner won't be used and kafka will
write to random partition which is cached for some time within the
producer. I have a custom partitioner to have fairness between the
partitions.

-Thanks,
Mohit

On Tue, Jun 23, 2015 at 10:34 PM, Liquan Pei liquan...@gmail.com wrote:

 Hi Mohit,

 If you instantiate the keyed message with
 val topic = topic
 val value = value
 val message = new KeyedMessage[String, String](topic,  value);

 Then the key in the KeyedMessage will be null.

 Hope this helps!

 Thanks,
 Liquan

 On Tue, Jun 23, 2015 at 8:18 AM, Mohit Kathuria mkathu...@sprinklr.com
 wrote:

  Hi,
 
  We are using kafka 0.8.1.1 in our production cluster. I recently started
  specifying key as the message itself. I just realised that the key is
 also
  written to the broker which means that the data is duplicated within a
  keyed message. I am going to change the key. Stupid mistake.
 
  However, just out of anxiety, I want to know whether we can turn off
  writing the key to the broker. Any configuration I can change to achieve
  this?
 
  -Thanks,
  Mohit Kathuria
 



 --
 Liquan Pei
 Department of Physics
 University of Massachusetts Amherst



Does kafka write key to broker?

2015-06-23 Thread Mohit Kathuria
Hi,

We are using kafka 0.8.1.1 in our production cluster. I recently started
specifying key as the message itself. I just realised that the key is also
written to the broker which means that the data is duplicated within a
keyed message. I am going to change the key. Stupid mistake.

However, just out of anxiety, I want to know whether we can turn off
writing the key to the broker. Any configuration I can change to achieve
this?

-Thanks,
Mohit Kathuria


Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2015-04-20 Thread Mohit Kathuria
Mike,

The endless rebalance errors occur due the error that Mayuresh just pasted.
The rebalance attempts fail because of the conflict in the zkNode.

Below is the exact trace.

*2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
node
[{version:1,subscription:{__:5},pattern:static,timestamp:1417964160024}]
at /consumers/__ids/**__**-1417963753598-b19de58d a while back in a
different session, hence I will backoff for this node to be deleted by
Zookeeper and retry*

Would be more than willing to help. Let me know if you need any more
information and reach out to me in person and we can work on it.

-Thanks,
Mohit




On Fri, Mar 27, 2015 at 3:00 AM, Mike Axiak m...@axiak.net wrote:

 No, we don't normally see conflicts. We'll see endless attempts to
 rebalance.

 -Mike

 On Thu, Mar 26, 2015 at 5:15 PM, Mayuresh Gharat 
 gharatmayures...@gmail.com
  wrote:

  Did you see something like this in any of the consumer logs :
 
  Conflict in ….. data : ……. stored data :……”  ?
 
  Thanks,
 
  Mayuresh
 
  On Thu, Mar 26, 2015 at 1:50 PM, Mike Axiak m...@axiak.net wrote:
 
   Hi guys,
  
   At HubSpot we think the issue is related to slow consumers. During a
   rebalance, one of the first things the consumer does is signal a
 shutdown
   to the fetcher [1] [2], in order to relinquish ownership of the
  partitions.
  
   This waits for the shutdown of all shutdown fetcher threads, which can
  only
   happen until the thread's enqueue current chunk command finishes. If
  you
   have a slow consumer or large chunk sizes, this could take a while
 which
   would make it difficult for the rebalance to actually occur
 successfully.
  
   We're testing out different solutions now. Currently under review,
 we're
   thinking about making the enqueue into the blocking queue timeout so we
  can
   check to see if we're running, to end the process of the current chunk
   early.
  
   Has anyone else noticed this? If so, are there any patches people have
   written. Once we have a clearer picture of solutions we'll send a few
   patches in JIRAs.
  
   Best,
   Mike
  
   1:
  
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L655
   2:
  
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L712
  
  
   On Mon, Dec 22, 2014 at 6:51 PM, Neha Narkhede n...@confluent.io
  wrote:
  
Can you share a reproducible test case?
   
On Tue, Dec 9, 2014 at 7:11 AM, Mohit Kathuria 
 mkathu...@sprinklr.com
  
wrote:
   
 Neha,

 The same issue reoccured with just 2 consumer processes. The
  exception
was
 related to conflict in writing the ephemeral node. Below was the
exception.
 Topic name is

   
  
 
 lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin
 with 30 partitions. The 2 processes were running on 2 servers with
  ips
 10.0.8.222 and 10.0.8.225.

 *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted
ephemeral
 node

   
  
 
 [{version:1,subscription:{lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin:5},pattern:static,timestamp:1417964160024}]
 at

   
  
 
 /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d
 a while back in a different session, hence I will backoff for this
  node
to
 be deleted by Zookeeper and retry*
 Attached the complete error logs. The exception occured after the
 rebalance failed even after 40 retries. Rebalance failed as the
  process
 already owning some of the partitions did not give us ownership due
  to
 conflicting ephemeral nodes. As you suggested, we ran the wchp
  command
on
 the 3 zookeeper nodes at this time and figured out that the watcher
  was
 registered for only one of the process. I am copying the kafka
  consumer
 watcher registered on one of the zookeeper servers. (Attached are
 the
wchp
 outputs of all 3 zk servers)

 *$echo wchp | nc localhost 2181 *



   
  
 
 */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids*

 * 0x34a175e1d5d0130*


 0x34a175e1d5d0130 was the ephemeral node session Id. I went back
 to
   the
 zookeeper shell and checked the consumers registered for this topic
  and
 consumer group(same as topic name). Attaching the output in
zkCommands.txt.
 This clearly shows that

 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130

 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127


 I think we have the issue here that both consumers have written to
 different ephemeral nodes. Watchers are registered for the one of
  the 2
 ephemeral node

Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-12-16 Thread Mohit Kathuria
Any suggestions what might be going on here. We are very much blinded here
and our application is getting effected due to this.


-Mohit

On Tue, Dec 9, 2014 at 8:41 PM, Mohit Kathuria mkathu...@sprinklr.com
wrote:

 Neha,

 The same issue reoccured with just 2 consumer processes. The exception was
 related to conflict in writing the ephemeral node. Below was the exception.
 Topic name is
  lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin
 with 30 partitions. The 2 processes were running on 2 servers with ips
 10.0.8.222 and 10.0.8.225.

 *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
 node
 [{version:1,subscription:{lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin:5},pattern:static,timestamp:1417964160024}]
 at
 /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d
 a while back in a different session, hence I will backoff for this node to
 be deleted by Zookeeper and retry*
 Attached the complete error logs. The exception occured after the
 rebalance failed even after 40 retries. Rebalance failed as the process
 already owning some of the partitions did not give us ownership due to
 conflicting ephemeral nodes. As you suggested, we ran the wchp command  on
 the 3 zookeeper nodes at this time and figured out that the watcher was
 registered for only one of the process. I am copying the kafka consumer
 watcher registered on one of the zookeeper servers. (Attached are the wchp
 outputs of all 3 zk servers)

 *$echo wchp | nc localhost 2181 *


 */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids*

 * 0x34a175e1d5d0130*


 0x34a175e1d5d0130 was the ephemeral node session Id. I went back to the
 zookeeper shell and checked the consumers registered for this topic and
 consumer group(same as topic name). Attaching the output in zkCommands.txt.
 This clearly shows that

 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130

 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127


 I think we have the issue here that both consumers have written to
 different ephemeral nodes. Watchers are registered for the one of the 2
 ephemeral node. The root cause seems to be the inconsistent state while
 writing the ephemeral nodes in ZK.

 Let me know if you need more details.

 -Thanks,

 Mohit




 On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

 A rebalance should trigger on all consumers when you add a new consumer to
 the group. If you don't see the zookeeper watch fire, the consumer may
 have
 somehow lost the watch. We have seen this behavior on older zk versions, I
 wonder it that bug got reintroduced. To verify if this is the case, you
 can
 run the wchp zookeeper command on the zk leader and check if each consumer
 has a watch registered.

 Do you have a way to try this on zk 3.3.4? I would recommend you try the
 wchp suggestion as well.

 On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria mkathu...@sprinklr.com
 wrote:

  Hi all,
 
  Can someone help here. We are getting constant rebalance failure each
 time
  a consumer is added beyond a certain number. Did quite a lot of
 debugging
  on this and still not able to figure out the pattern.
 
  -Thanks,
  Mohit
 
  On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria mkathu...@sprinklr.com
 
  wrote:
 
   Neha,
  
   Looks like an issue with the consumer rebalance not able to complete
   successfully. We were able to reproduce the issue on topic with 30
   partitions,  3 consumer processes(p1,p2 and p3), properties -  40
   rebalance.max.retries and 1(10s) rebalance.backoff.ms.
  
   Before the process p3 was started, partition ownership was as
 expected:
  
   partitions 0-14 owned by p1
   partitions 15-29 - owner p2
  
   As the process p3 started, rebalance was triggered. Process p3 was
   successfully able to acquire partition ownership for partitions 20-29
 as
   expected as per the rebalance algorithm. However, process p2 while
 trying
   to acquire ownership of partitions 10-19 saw rebalance failure after
 40
   retries.
  
   Attaching the logs from process p2 and process p1. It says that p2 was
   attempting to rebalance, it was trying to acquire ownership of
 partitions
   10-14 which were owned by process p1. However, at the same time
 process
  p1
   did not get any event for giving up the partition ownership for
  partitions
   1-14.
   We were expecting a rebalance to have triggered in p1 - but it didn't
 and
   hence not giving up ownership. Is our assumption correct/incorrect?
   And if the rebalance gets triggered in p1 - how to figure out apart
 from
   logs as the logs on p1 did not have anything.
  
   *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
   [topic_consumerIdString], waiting for the partition ownership to be
   deleted: 11

Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-12-09 Thread Mohit Kathuria
Neha,

The same issue reoccured with just 2 consumer processes. The exception was
related to conflict in writing the ephemeral node. Below was the exception.
Topic name is
 lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin
with 30 partitions. The 2 processes were running on 2 servers with ips
10.0.8.222 and 10.0.8.225.

*2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
node
[{version:1,subscription:{lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin:5},pattern:static,timestamp:1417964160024}]
at
/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d
a while back in a different session, hence I will backoff for this node to
be deleted by Zookeeper and retry*
Attached the complete error logs. The exception occured after the rebalance
failed even after 40 retries. Rebalance failed as the process already
owning some of the partitions did not give us ownership due to conflicting
ephemeral nodes. As you suggested, we ran the wchp command  on the 3
zookeeper nodes at this time and figured out that the watcher was
registered for only one of the process. I am copying the kafka consumer
watcher registered on one of the zookeeper servers. (Attached are the wchp
outputs of all 3 zk servers)

*$echo wchp | nc localhost 2181 *

*/kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids*

* 0x34a175e1d5d0130*


0x34a175e1d5d0130 was the ephemeral node session Id. I went back to the
zookeeper shell and checked the consumers registered for this topic and
consumer group(same as topic name). Attaching the output in zkCommands.txt.
This clearly shows that

10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130

10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127


I think we have the issue here that both consumers have written to
different ephemeral nodes. Watchers are registered for the one of the 2
ephemeral node. The root cause seems to be the inconsistent state while
writing the ephemeral nodes in ZK.

Let me know if you need more details.

-Thanks,

Mohit




On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 A rebalance should trigger on all consumers when you add a new consumer to
 the group. If you don't see the zookeeper watch fire, the consumer may have
 somehow lost the watch. We have seen this behavior on older zk versions, I
 wonder it that bug got reintroduced. To verify if this is the case, you can
 run the wchp zookeeper command on the zk leader and check if each consumer
 has a watch registered.

 Do you have a way to try this on zk 3.3.4? I would recommend you try the
 wchp suggestion as well.

 On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria mkathu...@sprinklr.com
 wrote:

  Hi all,
 
  Can someone help here. We are getting constant rebalance failure each
 time
  a consumer is added beyond a certain number. Did quite a lot of debugging
  on this and still not able to figure out the pattern.
 
  -Thanks,
  Mohit
 
  On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria mkathu...@sprinklr.com
  wrote:
 
   Neha,
  
   Looks like an issue with the consumer rebalance not able to complete
   successfully. We were able to reproduce the issue on topic with 30
   partitions,  3 consumer processes(p1,p2 and p3), properties -  40
   rebalance.max.retries and 1(10s) rebalance.backoff.ms.
  
   Before the process p3 was started, partition ownership was as expected:
  
   partitions 0-14 owned by p1
   partitions 15-29 - owner p2
  
   As the process p3 started, rebalance was triggered. Process p3 was
   successfully able to acquire partition ownership for partitions 20-29
 as
   expected as per the rebalance algorithm. However, process p2 while
 trying
   to acquire ownership of partitions 10-19 saw rebalance failure after 40
   retries.
  
   Attaching the logs from process p2 and process p1. It says that p2 was
   attempting to rebalance, it was trying to acquire ownership of
 partitions
   10-14 which were owned by process p1. However, at the same time process
  p1
   did not get any event for giving up the partition ownership for
  partitions
   1-14.
   We were expecting a rebalance to have triggered in p1 - but it didn't
 and
   hence not giving up ownership. Is our assumption correct/incorrect?
   And if the rebalance gets triggered in p1 - how to figure out apart
 from
   logs as the logs on p1 did not have anything.
  
   *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
   [topic_consumerIdString], waiting for the partition ownership to be
   deleted: 11*
  
   During and after the rebalance failed on process p2, Partition
 Ownership
   was as below:
   0-14 - owner p1
   15-19 - none
   20-29 - owner p3
  
   This left the consumers in inconsistent state as 5 partitions were
 never
   consumer from and neither

Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-11-07 Thread Mohit Kathuria
Hi all,

Can someone help here. We are getting constant rebalance failure each time
a consumer is added beyond a certain number. Did quite a lot of debugging
on this and still not able to figure out the pattern.

-Thanks,
Mohit

On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria mkathu...@sprinklr.com
wrote:

 Neha,

 Looks like an issue with the consumer rebalance not able to complete
 successfully. We were able to reproduce the issue on topic with 30
 partitions,  3 consumer processes(p1,p2 and p3), properties -  40
 rebalance.max.retries and 1(10s) rebalance.backoff.ms.

 Before the process p3 was started, partition ownership was as expected:

 partitions 0-14 owned by p1
 partitions 15-29 - owner p2

 As the process p3 started, rebalance was triggered. Process p3 was
 successfully able to acquire partition ownership for partitions 20-29 as
 expected as per the rebalance algorithm. However, process p2 while trying
 to acquire ownership of partitions 10-19 saw rebalance failure after 40
 retries.

 Attaching the logs from process p2 and process p1. It says that p2 was
 attempting to rebalance, it was trying to acquire ownership of partitions
 10-14 which were owned by process p1. However, at the same time process p1
 did not get any event for giving up the partition ownership for partitions
 1-14.
 We were expecting a rebalance to have triggered in p1 - but it didn't and
 hence not giving up ownership. Is our assumption correct/incorrect?
 And if the rebalance gets triggered in p1 - how to figure out apart from
 logs as the logs on p1 did not have anything.

 *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
 [topic_consumerIdString], waiting for the partition ownership to be
 deleted: 11*

 During and after the rebalance failed on process p2, Partition Ownership
 was as below:
 0-14 - owner p1
 15-19 - none
 20-29 - owner p3

 This left the consumers in inconsistent state as 5 partitions were never
 consumer from and neither was the partitions ownership balanced.

 However, there was no conflict in creating the ephemeral node which was
 the case last time. Just to note that the ephemeral node conflict which we
 were seeing earlier also appeared after rebalance failed. My hunch is that
 fixing the rebalance failure will fix that issue as well.

 -Thanks,
 Mohit



 On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

 Mohit,

 I wonder if it is related to
 https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires
 a
 session, it doesn't delete the ephemeral nodes immediately. So if you end
 up trying to recreate ephemeral nodes quickly, it could either be in the
 valid latest session or from the previously expired session. If you hit
 this problem, then waiting would resolve it. But if not, then this may be
 a
 legitimate bug in ZK 3.4.6.

 Can you try shutting down all your consumers, waiting until session
 timeout
 and restarting them?

 Thanks,
 Neha

 On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria mkathu...@sprinklr.com
 wrote:

  Dear Experts,
 
  We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
  topic with 30 partitions and 2 replicas. We are using High level
 consumer
  api.
  Each consumer process which is a storm topolofy has 5 streams which
  connects to 1 or more partitions. We are not using storm's inbuilt kafka
  spout. Everything runs fine till the 5th consumer process(25 streams) is
  added for this topic.
 
  As soon as the sixth consumer process is added, the newly added
 partition
  does not get the ownership of the partitions that it requests for as the
  already existing owners have not yet given up the ownership.
 
  We changed certain properties on consumer :
 
  1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
  rebalance.max.retries  zk connection timeout)
  2. Back off ms between rebalances - 1 (10seconds)
  3. ZK connection timeout - 100,000 (100 seconds)
 
  Although when I am looking in the zookeeper shell when the rebalance is
  happening, the consumer is registered fine on the zookeeper. Just that
 the
  rebalance does not happen.
  After the 20th rebalance gets completed, we get
 
 
  *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
  [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
  offsets after clearing the fetcher queues*
  *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
  exception while trying to start streamer threads:
  rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
 after
  20 retries*
  *kafka.common.ConsumerRebalanceFailedException:
  rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
 after
  20 retries*
  *at
 
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
  ~[stormjar.jar:na]*
  *at
 
 
 kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer

Fwd: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-11-03 Thread Mohit Kathuria
Neha,

Looks like an issue with the consumer rebalance not able to complete
successfully. We were able to reproduce the issue on topic with 30
partitions,  3 consumer processes(p1,p2 and p3), properties -  40
rebalance.max.retries and 1(10s) rebalance.backoff.ms.

Before the process p3 was started, partition ownership was as expected:

partitions 0-14 owned by p1
partitions 15-29 - owner p2

As the process p3 started, rebalance was triggered. Process p3 was
successfully able to acquire partition ownership for partitions 20-29 as
expected as per the rebalance algorithm. However, process p2 while trying
to acquire ownership of partitions 10-19 saw rebalance failure after 40
retries.

Attaching the logs from process p2 and process p1. It says that p2 was
attempting to rebalance, it was trying to acquire ownership of partitions
10-14 which were owned by process p1. However, at the same time process p1
did not get any event for giving up the partition ownership for partitions
1-14.
We were expecting a rebalance to have triggered in p1 - but it didn't and
hence not giving up ownership. Is our assumption correct/incorrect?
And if the rebalance gets triggered in p1 - how to figure out apart from
logs as the logs on p1 did not have anything.

*2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
[topic_consumerIdString], waiting for the partition ownership to be
deleted: 11*

During and after the rebalance failed on process p2, Partition Ownership
was as below:
0-14 - owner p1
15-19 - none
20-29 - owner p3

This left the consumers in inconsistent state as 5 partitions were never
consumer from and neither was the partitions ownership balanced.

However, there was no conflict in creating the ephemeral node which was the
case last time. Just to note that the ephemeral node conflict which we were
seeing earlier also appeared after rebalance failed. My hunch is that
fixing the rebalance failure will fix that issue as well.

-Thanks,
Mohit


Re: Rebalance not happening even after increasing max retries causing conflict in ZK

2014-11-03 Thread Mohit Kathuria
Neha,

In my last reply, the subject got changed thats why it got marked as new
message on
http://mail-archives.apache.org/mod_mbox/kafka-users/201411.mbox/date.
Please ignore that. Below text is the reply in continuation to
http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAALehC3X2i7+n8aaEkXtxBduwUUp6Zk7=-jxn8yrdcjy1f...@mail.gmail.com%3E


Looks like an issue with the consumer rebalance not able to complete
successfully. We were able to reproduce the issue on topic with 30
partitions,  3 consumer processes(p1,p2 and p3), properties -  40
rebalance.max.retries and 1(10s) rebalance.backoff.ms.

Before the process p3 was started, partition ownership was as expected:

partitions 0-14 owned by p1
partitions 15-29 - owner p2

As the process p3 started, rebalance was triggered. Process p3 was
successfully able to acquire partition ownership for partitions 20-29 as
expected as per the rebalance algorithm. However, process p2 while trying
to acquire ownership of partitions 10-19 saw rebalance failure after 40
retries.

Attaching the logs from process p2 and process p1. It says that p2 was
attempting to rebalance, it was trying to acquire ownership of partitions
10-14 which were owned by process p1. However, at the same time process p1
did not get any event for giving up the partition ownership for partitions
1-14.
We were expecting a rebalance to have triggered in p1 - but it didn't and
hence not giving up ownership. Is our assumption correct/incorrect?
And if the rebalance gets triggered in p1 - how to figure out apart from
logs as the logs on p1 did not have anything.

*2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO]
[topic_consumerIdString], waiting for the partition ownership to be
deleted: 11*

During and after the rebalance failed on process p2, Partition Ownership
was as below:
0-14 - owner p1
15-19 - none
20-29 - owner p3

This left the consumers in inconsistent state as 5 partitions were never
consumer from and neither was the partitions ownership balanced.

However, there was no conflict in creating the ephemeral node which was the
case last time. Just to note that the ephemeral node conflict which we were
seeing earlier also appeared after rebalance failed. My hunch is that
fixing the rebalance failure will fix that issue as well.

-Thanks,
Mohit



On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede neha.narkh...@gmail.com
wrote:

 Mohit,

 I wonder if it is related to
 https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires a
 session, it doesn't delete the ephemeral nodes immediately. So if you end
 up trying to recreate ephemeral nodes quickly, it could either be in the
 valid latest session or from the previously expired session. If you hit
 this problem, then waiting would resolve it. But if not, then this may be a
 legitimate bug in ZK 3.4.6.

 Can you try shutting down all your consumers, waiting until session timeout
 and restarting them?

 Thanks,
 Neha

 On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria mkathu...@sprinklr.com
 wrote:

  Dear Experts,
 
  We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
  topic with 30 partitions and 2 replicas. We are using High level consumer
  api.
  Each consumer process which is a storm topolofy has 5 streams which
  connects to 1 or more partitions. We are not using storm's inbuilt kafka
  spout. Everything runs fine till the 5th consumer process(25 streams) is
  added for this topic.
 
  As soon as the sixth consumer process is added, the newly added partition
  does not get the ownership of the partitions that it requests for as the
  already existing owners have not yet given up the ownership.
 
  We changed certain properties on consumer :
 
  1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
  rebalance.max.retries  zk connection timeout)
  2. Back off ms between rebalances - 1 (10seconds)
  3. ZK connection timeout - 100,000 (100 seconds)
 
  Although when I am looking in the zookeeper shell when the rebalance is
  happening, the consumer is registered fine on the zookeeper. Just that
 the
  rebalance does not happen.
  After the 20th rebalance gets completed, we get
 
 
  *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
  [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
  offsets after clearing the fetcher queues*
  *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
  exception while trying to start streamer threads:
  rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
 after
  20 retries*
  *kafka.common.ConsumerRebalanceFailedException:
  rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance
 after
  20 retries*
  *at
 
 
 kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
  ~[stormjar.jar:na]*
  *at
 
 
 kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector

Rebalance not happening even after increasing max retries causing conflict in ZK

2014-10-20 Thread Mohit Kathuria
Dear Experts,

We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of
topic with 30 partitions and 2 replicas. We are using High level consumer
api.
Each consumer process which is a storm topolofy has 5 streams which
connects to 1 or more partitions. We are not using storm's inbuilt kafka
spout. Everything runs fine till the 5th consumer process(25 streams) is
added for this topic.

As soon as the sixth consumer process is added, the newly added partition
does not get the ownership of the partitions that it requests for as the
already existing owners have not yet given up the ownership.

We changed certain properties on consumer :

1. Max Rebalance attempts - 20 ( rebalance.backoff.ms *
rebalance.max.retries  zk connection timeout)
2. Back off ms between rebalances - 1 (10seconds)
3. ZK connection timeout - 100,000 (100 seconds)

Although when I am looking in the zookeeper shell when the rebalance is
happening, the consumer is registered fine on the zookeeper. Just that the
rebalance does not happen.
After the 20th rebalance gets completed, we get


*2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO]
[rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all
offsets after clearing the fetcher queues*
*2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring
exception while trying to start streamer threads:
rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after
20 retries*
*kafka.common.ConsumerRebalanceFailedException:
rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after
20 retries*
*at
kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432)
~[stormjar.jar:na]*
*at
kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722)
~[stormjar.jar:na]*
*at
kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212)
~[stormjar.jar:na]*
*at
kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80)
~[stormjar.jar:na]*
*at
com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79)
~[stormjar.jar:na]*
*at
com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64)
~[stormjar.jar:na]*
*at
com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71)
[stormjar.jar:na]*
*at
com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48)
[stormjar.jar:na]*
*at
com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63)
[stormjar.jar:na]*
*at
com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121)
[stormjar.jar:na]*
*at
backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562)
[na:0.9.1-incubating]*
*at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433)
[na:0.9.1-incubating]*
*at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]*
*at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]*
*2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO]
[rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin registering
consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK*
*2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in
/consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
data:
{version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635}
stored data:
{version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025767483}*
*2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral
node
[{version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635}]
at
/consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b
a while back in a different session, hence I will backoff for this node to
be deleted by Zookeeper and retry*

Due to this error, none of the consumer consumes from these partitions in
contention which creates a sort of skewed lag on kafka side.  When the 6th
consumer was added, the existing owner process of the partitions in
question did not get rebalanced.

Any help would be highly appreciated.

-Thanks,
Mohit