Re: Does kafka write key to broker?
Guozhang, Thanks for the reply. That's exactly what I was looking for. -Thanks, Mohit On Tue, Jun 30, 2015 at 1:52 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Mohit, The KeyedMessage has a key and partKey as it fields, and if the partKey is not specified then key will be used as partKey by default. So for your case, if you do new KeyedMessage[String, String](topic, null /*key*/, partkey, value) then the partkey will be used to determine the partition but not written to broker. Guozhang On Wed, Jun 24, 2015 at 9:45 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Thanks Jason. Hi Liquan, I was doing this initially using KeyedMessage without setting the key. But if the key is null, the custom partitioner won't be used and kafka will write to random partition which is cached for some time within the producer. I have a custom partitioner to have fairness between the partitions. -Thanks, Mohit On Tue, Jun 23, 2015 at 10:34 PM, Liquan Pei liquan...@gmail.com wrote: Hi Mohit, If you instantiate the keyed message with val topic = topic val value = value val message = new KeyedMessage[String, String](topic, value); Then the key in the KeyedMessage will be null. Hope this helps! Thanks, Liquan On Tue, Jun 23, 2015 at 8:18 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Hi, We are using kafka 0.8.1.1 in our production cluster. I recently started specifying key as the message itself. I just realised that the key is also written to the broker which means that the data is duplicated within a keyed message. I am going to change the key. Stupid mistake. However, just out of anxiety, I want to know whether we can turn off writing the key to the broker. Any configuration I can change to achieve this? -Thanks, Mohit Kathuria -- Liquan Pei Department of Physics University of Massachusetts Amherst -- -- Guozhang
Re: Does kafka write key to broker?
Thanks Jason. Hi Liquan, I was doing this initially using KeyedMessage without setting the key. But if the key is null, the custom partitioner won't be used and kafka will write to random partition which is cached for some time within the producer. I have a custom partitioner to have fairness between the partitions. -Thanks, Mohit On Tue, Jun 23, 2015 at 10:34 PM, Liquan Pei liquan...@gmail.com wrote: Hi Mohit, If you instantiate the keyed message with val topic = topic val value = value val message = new KeyedMessage[String, String](topic, value); Then the key in the KeyedMessage will be null. Hope this helps! Thanks, Liquan On Tue, Jun 23, 2015 at 8:18 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Hi, We are using kafka 0.8.1.1 in our production cluster. I recently started specifying key as the message itself. I just realised that the key is also written to the broker which means that the data is duplicated within a keyed message. I am going to change the key. Stupid mistake. However, just out of anxiety, I want to know whether we can turn off writing the key to the broker. Any configuration I can change to achieve this? -Thanks, Mohit Kathuria -- Liquan Pei Department of Physics University of Massachusetts Amherst
Does kafka write key to broker?
Hi, We are using kafka 0.8.1.1 in our production cluster. I recently started specifying key as the message itself. I just realised that the key is also written to the broker which means that the data is duplicated within a keyed message. I am going to change the key. Stupid mistake. However, just out of anxiety, I want to know whether we can turn off writing the key to the broker. Any configuration I can change to achieve this? -Thanks, Mohit Kathuria
Re: Rebalance not happening even after increasing max retries causing conflict in ZK
Mike, The endless rebalance errors occur due the error that Mayuresh just pasted. The rebalance attempts fail because of the conflict in the zkNode. Below is the exact trace. *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{version:1,subscription:{__:5},pattern:static,timestamp:1417964160024}] at /consumers/__ids/**__**-1417963753598-b19de58d a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry* Would be more than willing to help. Let me know if you need any more information and reach out to me in person and we can work on it. -Thanks, Mohit On Fri, Mar 27, 2015 at 3:00 AM, Mike Axiak m...@axiak.net wrote: No, we don't normally see conflicts. We'll see endless attempts to rebalance. -Mike On Thu, Mar 26, 2015 at 5:15 PM, Mayuresh Gharat gharatmayures...@gmail.com wrote: Did you see something like this in any of the consumer logs : Conflict in ….. data : ……. stored data :……” ? Thanks, Mayuresh On Thu, Mar 26, 2015 at 1:50 PM, Mike Axiak m...@axiak.net wrote: Hi guys, At HubSpot we think the issue is related to slow consumers. During a rebalance, one of the first things the consumer does is signal a shutdown to the fetcher [1] [2], in order to relinquish ownership of the partitions. This waits for the shutdown of all shutdown fetcher threads, which can only happen until the thread's enqueue current chunk command finishes. If you have a slow consumer or large chunk sizes, this could take a while which would make it difficult for the rebalance to actually occur successfully. We're testing out different solutions now. Currently under review, we're thinking about making the enqueue into the blocking queue timeout so we can check to see if we're running, to end the process of the current chunk early. Has anyone else noticed this? If so, are there any patches people have written. Once we have a clearer picture of solutions we'll send a few patches in JIRAs. Best, Mike 1: https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L655 2: https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala#L712 On Mon, Dec 22, 2014 at 6:51 PM, Neha Narkhede n...@confluent.io wrote: Can you share a reproducible test case? On Tue, Dec 9, 2014 at 7:11 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Neha, The same issue reoccured with just 2 consumer processes. The exception was related to conflict in writing the ephemeral node. Below was the exception. Topic name is lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin with 30 partitions. The 2 processes were running on 2 servers with ips 10.0.8.222 and 10.0.8.225. *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{version:1,subscription:{lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin:5},pattern:static,timestamp:1417964160024}] at /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry* Attached the complete error logs. The exception occured after the rebalance failed even after 40 retries. Rebalance failed as the process already owning some of the partitions did not give us ownership due to conflicting ephemeral nodes. As you suggested, we ran the wchp command on the 3 zookeeper nodes at this time and figured out that the watcher was registered for only one of the process. I am copying the kafka consumer watcher registered on one of the zookeeper servers. (Attached are the wchp outputs of all 3 zk servers) *$echo wchp | nc localhost 2181 * */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids* * 0x34a175e1d5d0130* 0x34a175e1d5d0130 was the ephemeral node session Id. I went back to the zookeeper shell and checked the consumers registered for this topic and consumer group(same as topic name). Attaching the output in zkCommands.txt. This clearly shows that 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127 I think we have the issue here that both consumers have written to different ephemeral nodes. Watchers are registered for the one of the 2 ephemeral node
Re: Rebalance not happening even after increasing max retries causing conflict in ZK
Any suggestions what might be going on here. We are very much blinded here and our application is getting effected due to this. -Mohit On Tue, Dec 9, 2014 at 8:41 PM, Mohit Kathuria mkathu...@sprinklr.com wrote: Neha, The same issue reoccured with just 2 consumer processes. The exception was related to conflict in writing the ephemeral node. Below was the exception. Topic name is lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin with 30 partitions. The 2 processes were running on 2 servers with ips 10.0.8.222 and 10.0.8.225. *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{version:1,subscription:{lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin:5},pattern:static,timestamp:1417964160024}] at /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry* Attached the complete error logs. The exception occured after the rebalance failed even after 40 retries. Rebalance failed as the process already owning some of the partitions did not give us ownership due to conflicting ephemeral nodes. As you suggested, we ran the wchp command on the 3 zookeeper nodes at this time and figured out that the watcher was registered for only one of the process. I am copying the kafka consumer watcher registered on one of the zookeeper servers. (Attached are the wchp outputs of all 3 zk servers) *$echo wchp | nc localhost 2181 * */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids* * 0x34a175e1d5d0130* 0x34a175e1d5d0130 was the ephemeral node session Id. I went back to the zookeeper shell and checked the consumers registered for this topic and consumer group(same as topic name). Attaching the output in zkCommands.txt. This clearly shows that 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127 I think we have the issue here that both consumers have written to different ephemeral nodes. Watchers are registered for the one of the 2 ephemeral node. The root cause seems to be the inconsistent state while writing the ephemeral nodes in ZK. Let me know if you need more details. -Thanks, Mohit On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede neha.narkh...@gmail.com wrote: A rebalance should trigger on all consumers when you add a new consumer to the group. If you don't see the zookeeper watch fire, the consumer may have somehow lost the watch. We have seen this behavior on older zk versions, I wonder it that bug got reintroduced. To verify if this is the case, you can run the wchp zookeeper command on the zk leader and check if each consumer has a watch registered. Do you have a way to try this on zk 3.3.4? I would recommend you try the wchp suggestion as well. On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Hi all, Can someone help here. We are getting constant rebalance failure each time a consumer is added beyond a certain number. Did quite a lot of debugging on this and still not able to figure out the pattern. -Thanks, Mohit On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria mkathu...@sprinklr.com wrote: Neha, Looks like an issue with the consumer rebalance not able to complete successfully. We were able to reproduce the issue on topic with 30 partitions, 3 consumer processes(p1,p2 and p3), properties - 40 rebalance.max.retries and 1(10s) rebalance.backoff.ms. Before the process p3 was started, partition ownership was as expected: partitions 0-14 owned by p1 partitions 15-29 - owner p2 As the process p3 started, rebalance was triggered. Process p3 was successfully able to acquire partition ownership for partitions 20-29 as expected as per the rebalance algorithm. However, process p2 while trying to acquire ownership of partitions 10-19 saw rebalance failure after 40 retries. Attaching the logs from process p2 and process p1. It says that p2 was attempting to rebalance, it was trying to acquire ownership of partitions 10-14 which were owned by process p1. However, at the same time process p1 did not get any event for giving up the partition ownership for partitions 1-14. We were expecting a rebalance to have triggered in p1 - but it didn't and hence not giving up ownership. Is our assumption correct/incorrect? And if the rebalance gets triggered in p1 - how to figure out apart from logs as the logs on p1 did not have anything. *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO] [topic_consumerIdString], waiting for the partition ownership to be deleted: 11
Re: Rebalance not happening even after increasing max retries causing conflict in ZK
Neha, The same issue reoccured with just 2 consumer processes. The exception was related to conflict in writing the ephemeral node. Below was the exception. Topic name is lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin with 30 partitions. The 2 processes were running on 2 servers with ips 10.0.8.222 and 10.0.8.225. *2014-12-09 13:22:11 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{version:1,subscription:{lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin:5},pattern:static,timestamp:1417964160024}] at /consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin_ip-10-0-8-222-1417963753598-b19de58d a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry* Attached the complete error logs. The exception occured after the rebalance failed even after 40 retries. Rebalance failed as the process already owning some of the partitions did not give us ownership due to conflicting ephemeral nodes. As you suggested, we ran the wchp command on the 3 zookeeper nodes at this time and figured out that the watcher was registered for only one of the process. I am copying the kafka consumer watcher registered on one of the zookeeper servers. (Attached are the wchp outputs of all 3 zk servers) *$echo wchp | nc localhost 2181 * */kafka/consumers/lst_plugin_com.spr.listening.plugin.impl.plugins.SemantriaEnrichmentPlugin/ids* * 0x34a175e1d5d0130* 0x34a175e1d5d0130 was the ephemeral node session Id. I went back to the zookeeper shell and checked the consumers registered for this topic and consumer group(same as topic name). Attaching the output in zkCommands.txt. This clearly shows that 10.0.8.222 has ephemeralOwner = 0x34a175e1d5d0130 10.0.8.225 has ephemeralOwner = 0x34a175e1d5d0127 I think we have the issue here that both consumers have written to different ephemeral nodes. Watchers are registered for the one of the 2 ephemeral node. The root cause seems to be the inconsistent state while writing the ephemeral nodes in ZK. Let me know if you need more details. -Thanks, Mohit On Mon, Nov 10, 2014 at 8:46 AM, Neha Narkhede neha.narkh...@gmail.com wrote: A rebalance should trigger on all consumers when you add a new consumer to the group. If you don't see the zookeeper watch fire, the consumer may have somehow lost the watch. We have seen this behavior on older zk versions, I wonder it that bug got reintroduced. To verify if this is the case, you can run the wchp zookeeper command on the zk leader and check if each consumer has a watch registered. Do you have a way to try this on zk 3.3.4? I would recommend you try the wchp suggestion as well. On Fri, Nov 7, 2014 at 6:07 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Hi all, Can someone help here. We are getting constant rebalance failure each time a consumer is added beyond a certain number. Did quite a lot of debugging on this and still not able to figure out the pattern. -Thanks, Mohit On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria mkathu...@sprinklr.com wrote: Neha, Looks like an issue with the consumer rebalance not able to complete successfully. We were able to reproduce the issue on topic with 30 partitions, 3 consumer processes(p1,p2 and p3), properties - 40 rebalance.max.retries and 1(10s) rebalance.backoff.ms. Before the process p3 was started, partition ownership was as expected: partitions 0-14 owned by p1 partitions 15-29 - owner p2 As the process p3 started, rebalance was triggered. Process p3 was successfully able to acquire partition ownership for partitions 20-29 as expected as per the rebalance algorithm. However, process p2 while trying to acquire ownership of partitions 10-19 saw rebalance failure after 40 retries. Attaching the logs from process p2 and process p1. It says that p2 was attempting to rebalance, it was trying to acquire ownership of partitions 10-14 which were owned by process p1. However, at the same time process p1 did not get any event for giving up the partition ownership for partitions 1-14. We were expecting a rebalance to have triggered in p1 - but it didn't and hence not giving up ownership. Is our assumption correct/incorrect? And if the rebalance gets triggered in p1 - how to figure out apart from logs as the logs on p1 did not have anything. *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO] [topic_consumerIdString], waiting for the partition ownership to be deleted: 11* During and after the rebalance failed on process p2, Partition Ownership was as below: 0-14 - owner p1 15-19 - none 20-29 - owner p3 This left the consumers in inconsistent state as 5 partitions were never consumer from and neither
Re: Rebalance not happening even after increasing max retries causing conflict in ZK
Hi all, Can someone help here. We are getting constant rebalance failure each time a consumer is added beyond a certain number. Did quite a lot of debugging on this and still not able to figure out the pattern. -Thanks, Mohit On Mon, Nov 3, 2014 at 10:53 PM, Mohit Kathuria mkathu...@sprinklr.com wrote: Neha, Looks like an issue with the consumer rebalance not able to complete successfully. We were able to reproduce the issue on topic with 30 partitions, 3 consumer processes(p1,p2 and p3), properties - 40 rebalance.max.retries and 1(10s) rebalance.backoff.ms. Before the process p3 was started, partition ownership was as expected: partitions 0-14 owned by p1 partitions 15-29 - owner p2 As the process p3 started, rebalance was triggered. Process p3 was successfully able to acquire partition ownership for partitions 20-29 as expected as per the rebalance algorithm. However, process p2 while trying to acquire ownership of partitions 10-19 saw rebalance failure after 40 retries. Attaching the logs from process p2 and process p1. It says that p2 was attempting to rebalance, it was trying to acquire ownership of partitions 10-14 which were owned by process p1. However, at the same time process p1 did not get any event for giving up the partition ownership for partitions 1-14. We were expecting a rebalance to have triggered in p1 - but it didn't and hence not giving up ownership. Is our assumption correct/incorrect? And if the rebalance gets triggered in p1 - how to figure out apart from logs as the logs on p1 did not have anything. *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO] [topic_consumerIdString], waiting for the partition ownership to be deleted: 11* During and after the rebalance failed on process p2, Partition Ownership was as below: 0-14 - owner p1 15-19 - none 20-29 - owner p3 This left the consumers in inconsistent state as 5 partitions were never consumer from and neither was the partitions ownership balanced. However, there was no conflict in creating the ephemeral node which was the case last time. Just to note that the ephemeral node conflict which we were seeing earlier also appeared after rebalance failed. My hunch is that fixing the rebalance failure will fix that issue as well. -Thanks, Mohit On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Mohit, I wonder if it is related to https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires a session, it doesn't delete the ephemeral nodes immediately. So if you end up trying to recreate ephemeral nodes quickly, it could either be in the valid latest session or from the previously expired session. If you hit this problem, then waiting would resolve it. But if not, then this may be a legitimate bug in ZK 3.4.6. Can you try shutting down all your consumers, waiting until session timeout and restarting them? Thanks, Neha On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Dear Experts, We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of topic with 30 partitions and 2 replicas. We are using High level consumer api. Each consumer process which is a storm topolofy has 5 streams which connects to 1 or more partitions. We are not using storm's inbuilt kafka spout. Everything runs fine till the 5th consumer process(25 streams) is added for this topic. As soon as the sixth consumer process is added, the newly added partition does not get the ownership of the partitions that it requests for as the already existing owners have not yet given up the ownership. We changed certain properties on consumer : 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms * rebalance.max.retries zk connection timeout) 2. Back off ms between rebalances - 1 (10seconds) 3. ZK connection timeout - 100,000 (100 seconds) Although when I am looking in the zookeeper shell when the rebalance is happening, the consumer is registered fine on the zookeeper. Just that the rebalance does not happen. After the 20th rebalance gets completed, we get *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all offsets after clearing the fetcher queues* *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring exception while trying to start streamer threads: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *kafka.common.ConsumerRebalanceFailedException: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) ~[stormjar.jar:na]* *at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer
Fwd: Rebalance not happening even after increasing max retries causing conflict in ZK
Neha, Looks like an issue with the consumer rebalance not able to complete successfully. We were able to reproduce the issue on topic with 30 partitions, 3 consumer processes(p1,p2 and p3), properties - 40 rebalance.max.retries and 1(10s) rebalance.backoff.ms. Before the process p3 was started, partition ownership was as expected: partitions 0-14 owned by p1 partitions 15-29 - owner p2 As the process p3 started, rebalance was triggered. Process p3 was successfully able to acquire partition ownership for partitions 20-29 as expected as per the rebalance algorithm. However, process p2 while trying to acquire ownership of partitions 10-19 saw rebalance failure after 40 retries. Attaching the logs from process p2 and process p1. It says that p2 was attempting to rebalance, it was trying to acquire ownership of partitions 10-14 which were owned by process p1. However, at the same time process p1 did not get any event for giving up the partition ownership for partitions 1-14. We were expecting a rebalance to have triggered in p1 - but it didn't and hence not giving up ownership. Is our assumption correct/incorrect? And if the rebalance gets triggered in p1 - how to figure out apart from logs as the logs on p1 did not have anything. *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO] [topic_consumerIdString], waiting for the partition ownership to be deleted: 11* During and after the rebalance failed on process p2, Partition Ownership was as below: 0-14 - owner p1 15-19 - none 20-29 - owner p3 This left the consumers in inconsistent state as 5 partitions were never consumer from and neither was the partitions ownership balanced. However, there was no conflict in creating the ephemeral node which was the case last time. Just to note that the ephemeral node conflict which we were seeing earlier also appeared after rebalance failed. My hunch is that fixing the rebalance failure will fix that issue as well. -Thanks, Mohit
Re: Rebalance not happening even after increasing max retries causing conflict in ZK
Neha, In my last reply, the subject got changed thats why it got marked as new message on http://mail-archives.apache.org/mod_mbox/kafka-users/201411.mbox/date. Please ignore that. Below text is the reply in continuation to http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAALehC3X2i7+n8aaEkXtxBduwUUp6Zk7=-jxn8yrdcjy1f...@mail.gmail.com%3E Looks like an issue with the consumer rebalance not able to complete successfully. We were able to reproduce the issue on topic with 30 partitions, 3 consumer processes(p1,p2 and p3), properties - 40 rebalance.max.retries and 1(10s) rebalance.backoff.ms. Before the process p3 was started, partition ownership was as expected: partitions 0-14 owned by p1 partitions 15-29 - owner p2 As the process p3 started, rebalance was triggered. Process p3 was successfully able to acquire partition ownership for partitions 20-29 as expected as per the rebalance algorithm. However, process p2 while trying to acquire ownership of partitions 10-19 saw rebalance failure after 40 retries. Attaching the logs from process p2 and process p1. It says that p2 was attempting to rebalance, it was trying to acquire ownership of partitions 10-14 which were owned by process p1. However, at the same time process p1 did not get any event for giving up the partition ownership for partitions 1-14. We were expecting a rebalance to have triggered in p1 - but it didn't and hence not giving up ownership. Is our assumption correct/incorrect? And if the rebalance gets triggered in p1 - how to figure out apart from logs as the logs on p1 did not have anything. *2014-11-03 06:57:36 k.c.ZookeeperConsumerConnector [INFO] [topic_consumerIdString], waiting for the partition ownership to be deleted: 11* During and after the rebalance failed on process p2, Partition Ownership was as below: 0-14 - owner p1 15-19 - none 20-29 - owner p3 This left the consumers in inconsistent state as 5 partitions were never consumer from and neither was the partitions ownership balanced. However, there was no conflict in creating the ephemeral node which was the case last time. Just to note that the ephemeral node conflict which we were seeing earlier also appeared after rebalance failed. My hunch is that fixing the rebalance failure will fix that issue as well. -Thanks, Mohit On Mon, Oct 20, 2014 at 7:48 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Mohit, I wonder if it is related to https://issues.apache.org/jira/browse/KAFKA-1585. When zookeeper expires a session, it doesn't delete the ephemeral nodes immediately. So if you end up trying to recreate ephemeral nodes quickly, it could either be in the valid latest session or from the previously expired session. If you hit this problem, then waiting would resolve it. But if not, then this may be a legitimate bug in ZK 3.4.6. Can you try shutting down all your consumers, waiting until session timeout and restarting them? Thanks, Neha On Mon, Oct 20, 2014 at 6:15 AM, Mohit Kathuria mkathu...@sprinklr.com wrote: Dear Experts, We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of topic with 30 partitions and 2 replicas. We are using High level consumer api. Each consumer process which is a storm topolofy has 5 streams which connects to 1 or more partitions. We are not using storm's inbuilt kafka spout. Everything runs fine till the 5th consumer process(25 streams) is added for this topic. As soon as the sixth consumer process is added, the newly added partition does not get the ownership of the partitions that it requests for as the already existing owners have not yet given up the ownership. We changed certain properties on consumer : 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms * rebalance.max.retries zk connection timeout) 2. Back off ms between rebalances - 1 (10seconds) 3. ZK connection timeout - 100,000 (100 seconds) Although when I am looking in the zookeeper shell when the rebalance is happening, the consumer is registered fine on the zookeeper. Just that the rebalance does not happen. After the 20th rebalance gets completed, we get *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all offsets after clearing the fetcher queues* *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring exception while trying to start streamer threads: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *kafka.common.ConsumerRebalanceFailedException: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) ~[stormjar.jar:na]* *at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector
Rebalance not happening even after increasing max retries causing conflict in ZK
Dear Experts, We recently updated to kafka v0.8.1.1 with zookeeper v3.4.5. I have of topic with 30 partitions and 2 replicas. We are using High level consumer api. Each consumer process which is a storm topolofy has 5 streams which connects to 1 or more partitions. We are not using storm's inbuilt kafka spout. Everything runs fine till the 5th consumer process(25 streams) is added for this topic. As soon as the sixth consumer process is added, the newly added partition does not get the ownership of the partitions that it requests for as the already existing owners have not yet given up the ownership. We changed certain properties on consumer : 1. Max Rebalance attempts - 20 ( rebalance.backoff.ms * rebalance.max.retries zk connection timeout) 2. Back off ms between rebalances - 1 (10seconds) 3. ZK connection timeout - 100,000 (100 seconds) Although when I am looking in the zookeeper shell when the rebalance is happening, the consumer is registered fine on the zookeeper. Just that the rebalance does not happen. After the 20th rebalance gets completed, we get *2014-10-11 11:10:08 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], Committing all offsets after clearing the fetcher queues* *2014-10-11 11:10:10 c.s.m.k.i.c.KafkaFeedStreamer [WARN] Ignoring exception while trying to start streamer threads: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *kafka.common.ConsumerRebalanceFailedException: rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b can't rebalance after 20 retries* *at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:432) ~[stormjar.jar:na]* *at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$reinitializeConsumer(ZookeeperConsumerConnector.scala:722) ~[stormjar.jar:na]* *at kafka.consumer.ZookeeperConsumerConnector.consume(ZookeeperConsumerConnector.scala:212) ~[stormjar.jar:na]* *at kafka.javaapi.consumer.ZookeeperConsumerConnector.createMessageStreams(ZookeeperConsumerConnector.scala:80) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.createAndStartThreads(KafkaFeedStreamer.java:79) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedStreamer.startKafkaStreamThreadsIfNecessary(KafkaFeedStreamer.java:64) ~[stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedConsumerFactoryImpl.startStreamerIfNotRunning(KafkaFeedConsumerFactoryImpl.java:71) [stormjar.jar:na]* *at com.spr.messaging.kafka.impl.consumer.KafkaFeedPullConsumerImpl.startStreamerIfNotRunning(KafkaFeedPullConsumerImpl.java:48) [stormjar.jar:na]* *at com.spr.messaging.kafka.impl.KafkaFeedServiceImpl.getKafkaFeedPullConsumer(KafkaFeedServiceImpl.java:63) [stormjar.jar:na]* *at com.spr.storm.topology.spout.AbstractSprKafkaSpout.nextTuple(AbstractSprKafkaSpout.java:121) [stormjar.jar:na]* *at backtype.storm.daemon.executor$eval3848$fn__3849$fn__3864$fn__3893.invoke(executor.clj:562) [na:0.9.1-incubating]* *at backtype.storm.util$async_loop$fn__384.invoke(util.clj:433) [na:0.9.1-incubating]* *at clojure.lang.AFn.run(AFn.java:24) [clojure-1.4.0.jar:na]* *at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55]* *2014-10-11 11:10:10 k.c.ZookeeperConsumerConnector [INFO] [rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b], begin registering consumer rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b in ZK* *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] conflict in /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b data: {version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635} stored data: {version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025767483}* *2014-10-11 11:10:10 k.u.ZkUtils$ [INFO] I wrote this conflicted ephemeral node [{version:1,subscription:{rule-engine-feed:5},pattern:static,timestamp:1413025810635}] at /consumers/rule-engine-feed/ids/rule-engine-feed_ip-10-0-2-170-1413025767369-4679959b a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry* Due to this error, none of the consumer consumes from these partitions in contention which creates a sort of skewed lag on kafka side. When the 6th consumer was added, the existing owner process of the partitions in question did not get rebalanced. Any help would be highly appreciated. -Thanks, Mohit