[jira] [Commented] (KAFKA-877) Still getting kafka.common.NotLeaderForPartitionException
[ https://issues.apache.org/jira/browse/KAFKA-877?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13666402#comment-13666402 ] Neha Narkhede commented on KAFKA-877: - We have fixed a number of bugs related to this in 08 recently. Do you mind trying it out again ? Also, would you mind describing in a separate JIRA what problems you had while trying to run Kafka on Windows? Still getting kafka.common.NotLeaderForPartitionException - Key: KAFKA-877 URL: https://issues.apache.org/jira/browse/KAFKA-877 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Environment: DEV Reporter: BalajiSeshadri Priority: Blocker Attachments: KAFKA-816.jpg Using the below trunk and i still see error happening.Please let us know if this can be fixed. https://github.com/apache/kafka.git [2013-04-25 16:47:08,924] WARN [console-consumer-24019_MERD7-21964-1366930009136-8b7f9eb7-leader-finder-thread], Failed to add fetcher for [mytopic,0] to broker id:0,host:MERD7-21964.echostar.com,port:9092 (kafka.consumer.ConsumerFetcherManager$$anon$1) kafka.common.NotLeaderForPartitionException at sun.reflect.GeneratedConstructorAccessor1.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:525) at java.lang.Class.newInstance0(Class.java:372) at java.lang.Class.newInstance(Class.java:325) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:72) at kafka.consumer.SimpleConsumer.earliestOrLatestOffset(SimpleConsumer.scala:163) at kafka.consumer.ConsumerFetcherThread.handleOffsetOutOfRange(ConsumerFetcherThread.scala:61) at kafka.server.AbstractFetcherThread.addPartition(AbstractFetcherThread.scala:167) at kafka.server.AbstractFetcherManager.addFetcher(AbstractFetcherManager.scala:48) at kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$3.apply(ConsumerFetcherManager.scala:79) at kafka.consumer.ConsumerFetcherManager$$anon$1$$anonfun$doWork$3.apply(ConsumerFetcherManager.scala:75) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:95) at scala.collection.Iterator$class.foreach(Iterator.scala:772) at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:157) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:190) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:45) at scala.collection.mutable.HashMap.foreach(HashMap.scala:95) at kafka.consumer.ConsumerFetcherManager$$anon$1.doWork(ConsumerFetcherManager.scala:75) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) We are evaluating Kafka for our new messaging system and we had tough time running in windows. We somehow managed to run 0.8 using cygwin but when we run the console producer/consumer,we are not getting messages from consumer. Please help us to fix this issue,this might not be related but its keeping on throwing this error on consumer side. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-919) Disabling of auto commit is ignored during consumer group rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phil Hargett updated KAFKA-919: --- Status: Open (was: Patch Available) Disabling of auto commit is ignored during consumer group rebalancing - Key: KAFKA-919 URL: https://issues.apache.org/jira/browse/KAFKA-919 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Environment: java 7/linux Reporter: Phil Hargett Assignee: Neha Narkhede Attachments: kafka-919.patch From the mailing list: In one of our applications using Kafka, we are using the high-level consumer to pull messages from our topic. Because we pull messages from topics in discrete units (e.g., an hour's worth of messages), we want to control explicitly when offsets are committed. Even though auto.commit.enable is set to false, during consumer group rebalancing, offsets are committed anyway, regardless of the setting of this flag. Is this a bug? Or just a known gap in offset management? I do see plenty of notes on the wiki suggesting future releases may enable applications using the high-level consumer to have more fine-grained control over offset management. I also fully realize that different applications have different needs, and meeting all of them with a clean API can be challenging. In the case of this application, the high-level consumer solves the problem of locating the correct in a cluster for a given topic, so there are advantages to using it, even if we are not using it to balance fetch load across multiple consumers. We ideally have only 1 consumer active per consumer group, and can tolerate some duplicate messages. But, the consumer groups make it easy for 1 consumer to recover at the correct starting point, should the prior consumer in the group have failed before doing a commit. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-919) Disabling of auto commit is ignored during consumer group rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Phil Hargett updated KAFKA-919: --- Status: Patch Available (was: Open) Disabling of auto commit is ignored during consumer group rebalancing - Key: KAFKA-919 URL: https://issues.apache.org/jira/browse/KAFKA-919 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Environment: java 7/linux Reporter: Phil Hargett Assignee: Neha Narkhede Attachments: kafka-919.patch From the mailing list: In one of our applications using Kafka, we are using the high-level consumer to pull messages from our topic. Because we pull messages from topics in discrete units (e.g., an hour's worth of messages), we want to control explicitly when offsets are committed. Even though auto.commit.enable is set to false, during consumer group rebalancing, offsets are committed anyway, regardless of the setting of this flag. Is this a bug? Or just a known gap in offset management? I do see plenty of notes on the wiki suggesting future releases may enable applications using the high-level consumer to have more fine-grained control over offset management. I also fully realize that different applications have different needs, and meeting all of them with a clean API can be challenging. In the case of this application, the high-level consumer solves the problem of locating the correct in a cluster for a given topic, so there are advantages to using it, even if we are not using it to balance fetch load across multiple consumers. We ideally have only 1 consumer active per consumer group, and can tolerate some duplicate messages. But, the consumer groups make it easy for 1 consumer to recover at the correct starting point, should the prior consumer in the group have failed before doing a commit. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-919) Disabling of auto commit is ignored during consumer group rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-919?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13666442#comment-13666442 ] Phil Hargett commented on KAFKA-919: Note from Jun on the users mailing list: it's a bug. Disabling of auto commit is ignored during consumer group rebalancing - Key: KAFKA-919 URL: https://issues.apache.org/jira/browse/KAFKA-919 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Environment: java 7/linux Reporter: Phil Hargett Assignee: Neha Narkhede Attachments: kafka-919.patch From the mailing list: In one of our applications using Kafka, we are using the high-level consumer to pull messages from our topic. Because we pull messages from topics in discrete units (e.g., an hour's worth of messages), we want to control explicitly when offsets are committed. Even though auto.commit.enable is set to false, during consumer group rebalancing, offsets are committed anyway, regardless of the setting of this flag. Is this a bug? Or just a known gap in offset management? I do see plenty of notes on the wiki suggesting future releases may enable applications using the high-level consumer to have more fine-grained control over offset management. I also fully realize that different applications have different needs, and meeting all of them with a clean API can be challenging. In the case of this application, the high-level consumer solves the problem of locating the correct in a cluster for a given topic, so there are advantages to using it, even if we are not using it to balance fetch load across multiple consumers. We ideally have only 1 consumer active per consumer group, and can tolerate some duplicate messages. But, the consumer groups make it easy for 1 consumer to recover at the correct starting point, should the prior consumer in the group have failed before doing a commit. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-911) Bug in controlled shutdown logic in controller leads to controller not sending out some state change request
[ https://issues.apache.org/jira/browse/KAFKA-911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13666445#comment-13666445 ] Neha Narkhede commented on KAFKA-911: - You are right that we can send the reduced ISR request to the leader, but that is independent of removing the shutting down broker from the ISR in zookeeper. I'm arguing that the zookeeper write is unnecessary. To handle the issue you described, we can send a leader and isr request just to the leader with the reduced isr. Bug in controlled shutdown logic in controller leads to controller not sending out some state change request - Key: KAFKA-911 URL: https://issues.apache.org/jira/browse/KAFKA-911 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8 Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Blocker Labels: kafka-0.8, p1 Attachments: kafka-911-v1.patch The controlled shutdown logic in the controller first tries to move the leaders from the broker being shutdown. Then it tries to remove the broker from the isr list. During that operation, it does not synchronize on the controllerLock. This causes a race condition while dispatching data using the controller's channel manager. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Created] (KAFKA-920) zkclient jar 0.2.0 is not compatible with 0.1.0
Jun Rao created KAFKA-920: - Summary: zkclient jar 0.2.0 is not compatible with 0.1.0 Key: KAFKA-920 URL: https://issues.apache.org/jira/browse/KAFKA-920 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Priority: Blocker Just realized that zkclient 0.2.0 introduced a non-backward compatible api. In 0.1.0, it has public void writeData(java.lang.String path, java.lang.Object datat) public void writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) In 0.2.0, they are changed to public Stat writeData(java.lang.String path, java.lang.Object datat) public Stat writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) This means that If an application uses Kafka and also drags in another library (X) that depends on zkclient 0.1.0 (and uses void writeData()), then when they upgrade to Kafka 0.8 consumer (which uses zkclient 0.2.0), their application can't just upgrade to zkclient 0.2.0 since library X's call to void writeData() will fail because of the signature change. Since zkclient 0.1.0 is widely used, this issue may affect many applications. This non-backward compatible change was introduced by me since I didn't realize it's a signature change then. I am trying to see if zkclient can release a new version that's compatible. If that can't be done in time, we will have to downgrade zkclient to 0.1.0 and add the needed ZK functionality inside Kafka. This is not ideal, but can solve the issue quicker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Assigned] (KAFKA-920) zkclient jar 0.2.0 is not compatible with 0.1.0
[ https://issues.apache.org/jira/browse/KAFKA-920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao reassigned KAFKA-920: - Assignee: Jun Rao zkclient jar 0.2.0 is not compatible with 0.1.0 --- Key: KAFKA-920 URL: https://issues.apache.org/jira/browse/KAFKA-920 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Priority: Blocker Attachments: kafka-920-downgrade-zkclient.patch Just realized that zkclient 0.2.0 introduced a non-backward compatible api. In 0.1.0, it has public void writeData(java.lang.String path, java.lang.Object datat) public void writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) In 0.2.0, they are changed to public Stat writeData(java.lang.String path, java.lang.Object datat) public Stat writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) This means that If an application uses Kafka and also drags in another library (X) that depends on zkclient 0.1.0 (and uses void writeData()), then when they upgrade to Kafka 0.8 consumer (which uses zkclient 0.2.0), their application can't just upgrade to zkclient 0.2.0 since library X's call to void writeData() will fail because of the signature change. Since zkclient 0.1.0 is widely used, this issue may affect many applications. This non-backward compatible change was introduced by me since I didn't realize it's a signature change then. I am trying to see if zkclient can release a new version that's compatible. If that can't be done in time, we will have to downgrade zkclient to 0.1.0 and add the needed ZK functionality inside Kafka. This is not ideal, but can solve the issue quicker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-920) zkclient jar 0.2.0 is not compatible with 0.1.0
[ https://issues.apache.org/jira/browse/KAFKA-920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-920: -- Attachment: kafka-920-downgrade-zkclient.patch Attach a patch that downgrades zkclient to 0.1.0. The idea is to create a kafka.zookeeper.ZkClient that wraps zkclient and exposes the needed new api. All other changes are package renaming. Unit tests pass. zkclient jar 0.2.0 is not compatible with 0.1.0 --- Key: KAFKA-920 URL: https://issues.apache.org/jira/browse/KAFKA-920 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Priority: Blocker Attachments: kafka-920-downgrade-zkclient.patch Just realized that zkclient 0.2.0 introduced a non-backward compatible api. In 0.1.0, it has public void writeData(java.lang.String path, java.lang.Object datat) public void writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) In 0.2.0, they are changed to public Stat writeData(java.lang.String path, java.lang.Object datat) public Stat writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) This means that If an application uses Kafka and also drags in another library (X) that depends on zkclient 0.1.0 (and uses void writeData()), then when they upgrade to Kafka 0.8 consumer (which uses zkclient 0.2.0), their application can't just upgrade to zkclient 0.2.0 since library X's call to void writeData() will fail because of the signature change. Since zkclient 0.1.0 is widely used, this issue may affect many applications. This non-backward compatible change was introduced by me since I didn't realize it's a signature change then. I am trying to see if zkclient can release a new version that's compatible. If that can't be done in time, we will have to downgrade zkclient to 0.1.0 and add the needed ZK functionality inside Kafka. This is not ideal, but can solve the issue quicker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-920) zkclient jar 0.2.0 is not compatible with 0.1.0
[ https://issues.apache.org/jira/browse/KAFKA-920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-920: -- Status: Patch Available (was: Open) zkclient jar 0.2.0 is not compatible with 0.1.0 --- Key: KAFKA-920 URL: https://issues.apache.org/jira/browse/KAFKA-920 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Priority: Blocker Attachments: kafka-920-downgrade-zkclient.patch Just realized that zkclient 0.2.0 introduced a non-backward compatible api. In 0.1.0, it has public void writeData(java.lang.String path, java.lang.Object datat) public void writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) In 0.2.0, they are changed to public Stat writeData(java.lang.String path, java.lang.Object datat) public Stat writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) This means that If an application uses Kafka and also drags in another library (X) that depends on zkclient 0.1.0 (and uses void writeData()), then when they upgrade to Kafka 0.8 consumer (which uses zkclient 0.2.0), their application can't just upgrade to zkclient 0.2.0 since library X's call to void writeData() will fail because of the signature change. Since zkclient 0.1.0 is widely used, this issue may affect many applications. This non-backward compatible change was introduced by me since I didn't realize it's a signature change then. I am trying to see if zkclient can release a new version that's compatible. If that can't be done in time, we will have to downgrade zkclient to 0.1.0 and add the needed ZK functionality inside Kafka. This is not ideal, but can solve the issue quicker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-920) zkclient jar 0.2.0 is not compatible with 0.1.0
[ https://issues.apache.org/jira/browse/KAFKA-920?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13666489#comment-13666489 ] Neha Narkhede commented on KAFKA-920: - I tried applying your patch, it fails at the new file - can't find file to patch at input line 392 Perhaps you used the wrong -p or --strip option? The text leading up to this was: -- |diff --git a/core/src/main/scala/kafka/zookeeper/ZkClient.java b/core/src/main/scala/kafka/zookeeper/ZkClient.java |index 9a82120..0c6e305 100644 |--- a/core/src/main/scala/kafka/zookeeper/ZkClient.java |+++ b/core/src/main/scala/kafka/zookeeper/ZkClient.java -- zkclient jar 0.2.0 is not compatible with 0.1.0 --- Key: KAFKA-920 URL: https://issues.apache.org/jira/browse/KAFKA-920 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Priority: Blocker Attachments: kafka-920-downgrade-zkclient.patch Just realized that zkclient 0.2.0 introduced a non-backward compatible api. In 0.1.0, it has public void writeData(java.lang.String path, java.lang.Object datat) public void writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) In 0.2.0, they are changed to public Stat writeData(java.lang.String path, java.lang.Object datat) public Stat writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) This means that If an application uses Kafka and also drags in another library (X) that depends on zkclient 0.1.0 (and uses void writeData()), then when they upgrade to Kafka 0.8 consumer (which uses zkclient 0.2.0), their application can't just upgrade to zkclient 0.2.0 since library X's call to void writeData() will fail because of the signature change. Since zkclient 0.1.0 is widely used, this issue may affect many applications. This non-backward compatible change was introduced by me since I didn't realize it's a signature change then. I am trying to see if zkclient can release a new version that's compatible. If that can't be done in time, we will have to downgrade zkclient to 0.1.0 and add the needed ZK functionality inside Kafka. This is not ideal, but can solve the issue quicker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-911) Bug in controlled shutdown logic in controller leads to controller not sending out some state change request
[ https://issues.apache.org/jira/browse/KAFKA-911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13666532#comment-13666532 ] Joel Koshy commented on KAFKA-911: -- I had to revisit the notes from KAFKA-340. I think this was touched upon. i.e., the fact that the current implementation's attempt to shrink ISR may be ineffective for partitions whose leadership has been moved from the current broker - https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13483478page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13483478 quote 3.4 What is the point of sending leader and isr request at the end of shutdownBroker, since the OfflineReplica state change would've taken care of that anyway. It seems like you just need to send the stop replica request with the delete partitions flag turned off, no ? I still need (as an optimization) to send the leader and isr request to the leaders of all partitions that are present on the shutting down broker so it can remove the shutting down broker from its inSyncReplicas cache (in Partition.scala) so it no longer waits for acks from the shutting down broker if a producer request's num-acks is set to -1. Otherwise, we have to wait for the leader to organically shrink the ISR. This also applies to partitions which are moved (i.e., partitions for which the shutting down broker was the leader): the ControlledShutdownLeaderSelector needs to send the updated leaderAndIsr request to the shutting down broker as well (to tell it that it is no longer the leader) at which point it will start up a replica fetcher and re-enter the ISR. So in fact, there is actually not much point in removing the current leader from the ISR in the ControlledShutdownLeaderSelector.selectLeader. /quote and https://issues.apache.org/jira/browse/KAFKA-340?focusedCommentId=13484727page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-13484727 (I don't think I actually filed that jira though.) Bug in controlled shutdown logic in controller leads to controller not sending out some state change request - Key: KAFKA-911 URL: https://issues.apache.org/jira/browse/KAFKA-911 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8 Reporter: Neha Narkhede Assignee: Neha Narkhede Priority: Blocker Labels: kafka-0.8, p1 Attachments: kafka-911-v1.patch The controlled shutdown logic in the controller first tries to move the leaders from the broker being shutdown. Then it tries to remove the broker from the isr list. During that operation, it does not synchronize on the controllerLock. This causes a race condition while dispatching data using the controller's channel manager. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-920) zkclient jar 0.2.0 is not compatible with 0.1.0
[ https://issues.apache.org/jira/browse/KAFKA-920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao updated KAFKA-920: -- Attachment: kafka-920-downgrade-zkclient_v2.patch Attach patch v2 (by using git diff HEAD). The zkclient folks actually just committed a change to make the api backward compatible with zkclient 0.1. So, we may be able to use zkclient 0.3 if it's released in time. zkclient jar 0.2.0 is not compatible with 0.1.0 --- Key: KAFKA-920 URL: https://issues.apache.org/jira/browse/KAFKA-920 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Jun Rao Assignee: Jun Rao Priority: Blocker Attachments: kafka-920-downgrade-zkclient.patch, kafka-920-downgrade-zkclient_v2.patch Just realized that zkclient 0.2.0 introduced a non-backward compatible api. In 0.1.0, it has public void writeData(java.lang.String path, java.lang.Object datat) public void writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) In 0.2.0, they are changed to public Stat writeData(java.lang.String path, java.lang.Object datat) public Stat writeData(java.lang.String path, java.lang.Object datat, int expectedVersion) This means that If an application uses Kafka and also drags in another library (X) that depends on zkclient 0.1.0 (and uses void writeData()), then when they upgrade to Kafka 0.8 consumer (which uses zkclient 0.2.0), their application can't just upgrade to zkclient 0.2.0 since library X's call to void writeData() will fail because of the signature change. Since zkclient 0.1.0 is widely used, this issue may affect many applications. This non-backward compatible change was introduced by me since I didn't realize it's a signature change then. I am trying to see if zkclient can release a new version that's compatible. If that can't be done in time, we will have to downgrade zkclient to 0.1.0 and add the needed ZK functionality inside Kafka. This is not ideal, but can solve the issue quicker. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Updated] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error
[ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy updated KAFKA-916: - Attachment: KAFKA-916-v1.patch Agreed - I think that should fix the issue. Deadlock between fetcher shutdown and handling partitions with error Key: KAFKA-916 URL: https://issues.apache.org/jira/browse/KAFKA-916 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 Attachments: KAFKA-916-v1.patch Here is another consumer deadlock that we encountered. All consumers are vulnerable to this during a rebalance if there happen to be partitions in error. On a rebalance, the fetcher manager closes all fetchers and this holds on to the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] While the fetcher manager is iterating over fetchers to stop them, a fetcher that is yet to be stopped hits an error on a partition and proceeds to handle partitions with error [t2]. This handling involves looking up the fetcher for that partition and then removing it from the fetcher's set of partitions to consume. This requires grabbing the same map lock in [t1], hence the deadlock. [t1] 2013-05-22_20:23:11.95767 main prio=10 tid=0x7f1b24007800 nid=0x573b waiting on condition [0x7f1b2bd38000] 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking) 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) 2013-05-22_20:23:11.95767 - parking to wait for 0x7f1a25780598 (a java.util.concurrent.CountDownLatch$Sync) 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) 2013-05-22_20:23:11.95768 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 2013-05-22_20:23:11.95768 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-22_20:23:11.95770 at scala.collection.Iterator$class.foreach(Iterator.scala:631) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 2013-05-22_20:23:11.95771 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) 2013-05-22_20:23:11.95771 at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) --- 2013-05-22_20:23:11.95771- locked 0x7f1a2ae92510 (a java.lang.Object) 2013-05-22_20:23:11.95771 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) 2013-05-22_20:23:11.95771 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) 2013-05-22_20:23:11.95772 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 2013-05-22_20:23:11.95773 at
[jira] [Resolved] (KAFKA-919) Disabling of auto commit is ignored during consumer group rebalancing
[ https://issues.apache.org/jira/browse/KAFKA-919?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jun Rao resolved KAFKA-919. --- Resolution: Fixed Fix Version/s: 0.8 Assignee: (was: Neha Narkhede) Thanks for the patch. Committed to 0.8. Disabling of auto commit is ignored during consumer group rebalancing - Key: KAFKA-919 URL: https://issues.apache.org/jira/browse/KAFKA-919 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 0.8 Environment: java 7/linux Reporter: Phil Hargett Fix For: 0.8 Attachments: kafka-919.patch From the mailing list: In one of our applications using Kafka, we are using the high-level consumer to pull messages from our topic. Because we pull messages from topics in discrete units (e.g., an hour's worth of messages), we want to control explicitly when offsets are committed. Even though auto.commit.enable is set to false, during consumer group rebalancing, offsets are committed anyway, regardless of the setting of this flag. Is this a bug? Or just a known gap in offset management? I do see plenty of notes on the wiki suggesting future releases may enable applications using the high-level consumer to have more fine-grained control over offset management. I also fully realize that different applications have different needs, and meeting all of them with a clean API can be challenging. In the case of this application, the high-level consumer solves the problem of locating the correct in a cluster for a given topic, so there are advantages to using it, even if we are not using it to balance fetch load across multiple consumers. We ideally have only 1 consumer active per consumer group, and can tolerate some duplicate messages. But, the consumer groups make it easy for 1 consumer to recover at the correct starting point, should the prior consumer in the group have failed before doing a commit. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira
[jira] [Commented] (KAFKA-916) Deadlock between fetcher shutdown and handling partitions with error
[ https://issues.apache.org/jira/browse/KAFKA-916?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=13666973#comment-13666973 ] Jun Rao commented on KAFKA-916: --- Thanks for the patch. +1. Deadlock between fetcher shutdown and handling partitions with error Key: KAFKA-916 URL: https://issues.apache.org/jira/browse/KAFKA-916 Project: Kafka Issue Type: Bug Affects Versions: 0.8 Reporter: Joel Koshy Fix For: 0.8 Attachments: KAFKA-916-v1.patch Here is another consumer deadlock that we encountered. All consumers are vulnerable to this during a rebalance if there happen to be partitions in error. On a rebalance, the fetcher manager closes all fetchers and this holds on to the fetcher thread map's lock. (mapLock in AbstractFetcherManager). [t1] While the fetcher manager is iterating over fetchers to stop them, a fetcher that is yet to be stopped hits an error on a partition and proceeds to handle partitions with error [t2]. This handling involves looking up the fetcher for that partition and then removing it from the fetcher's set of partitions to consume. This requires grabbing the same map lock in [t1], hence the deadlock. [t1] 2013-05-22_20:23:11.95767 main prio=10 tid=0x7f1b24007800 nid=0x573b waiting on condition [0x7f1b2bd38000] 2013-05-22_20:23:11.95767java.lang.Thread.State: WAITING (parking) 2013-05-22_20:23:11.95767 at sun.misc.Unsafe.park(Native Method) 2013-05-22_20:23:11.95767 - parking to wait for 0x7f1a25780598 (a java.util.concurrent.CountDownLatch$Sync) 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158) 2013-05-22_20:23:11.95767 at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811) 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969) 2013-05-22_20:23:11.95768 at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281) 2013-05-22_20:23:11.95768 at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:207) 2013-05-22_20:23:11.95768 at kafka.utils.ShutdownableThread.shutdown(ShutdownableThread.scala:36) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherThread.shutdown(AbstractFetcherThread.scala:68) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:79) 2013-05-22_20:23:11.95769 at kafka.server.AbstractFetcherManager$$anonfun$closeAllFetchers$1.apply(AbstractFetcherManager.scala:78) 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-22_20:23:11.95769 at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:80) 2013-05-22_20:23:11.95770 at scala.collection.Iterator$class.foreach(Iterator.scala:631) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$$anon$1.foreach(HashTable.scala:161) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:194) 2013-05-22_20:23:11.95770 at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) 2013-05-22_20:23:11.95771 at scala.collection.mutable.HashMap.foreach(HashMap.scala:80) 2013-05-22_20:23:11.95771 at kafka.server.AbstractFetcherManager.closeAllFetchers(AbstractFetcherManager.scala:78) --- 2013-05-22_20:23:11.95771- locked 0x7f1a2ae92510 (a java.lang.Object) 2013-05-22_20:23:11.95771 at kafka.consumer.ConsumerFetcherManager.stopConnections(ConsumerFetcherManager.scala:156) 2013-05-22_20:23:11.95771 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$closeFetchersForQueues(ZookeeperConsumerConnector.scala:488) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.closeFetchers(ZookeeperConsumerConnector.scala:525) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.kafka$consumer$ZookeeperConsumerConnector$ZKRebalancerListener$$rebalance(ZookeeperConsumerConnector.scala:422) 2013-05-22_20:23:11.95772 at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener$$anonfun$syncedRebalance$1.apply$mcVI$sp(ZookeeperConsumerConnector.scala:374) 2013-05-22_20:23:11.95772 at scala.collection.immutable.Range$ByOne$class.foreach$mVc$sp(Range.scala:282) 2013-05-22_20:23:11.95773 at