[jira] [Assigned] (KAFKA-4261) Provide debug option in vagrant-up.sh
[ https://issues.apache.org/jira/browse/KAFKA-4261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira reassigned KAFKA-4261: --- Assignee: Flavio Junqueira > Provide debug option in vagrant-up.sh > - > > Key: KAFKA-4261 > URL: https://issues.apache.org/jira/browse/KAFKA-4261 > Project: Kafka > Issue Type: Improvement > Components: system tests >Affects Versions: 0.10.1.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Minor > Fix For: 0.11.0.0 > > > I found it useful to use the debug flag for vagrant, and I had to edit the > script to make it happen. Here I'm just proposing a simple change to provide > a debug command-line option instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-4261) Provide debug option in vagrant-up.sh
Flavio Junqueira created KAFKA-4261: --- Summary: Provide debug option in vagrant-up.sh Key: KAFKA-4261 URL: https://issues.apache.org/jira/browse/KAFKA-4261 Project: Kafka Issue Type: Improvement Components: system tests Affects Versions: 0.10.1.0 Reporter: Flavio Junqueira Priority: Minor Fix For: 0.11.0.0 I found it useful to use the debug flag for vagrant, and I had to edit the script to make it happen. Here I'm just proposing a simple change to provide a debug command-line option instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3985) Transient system test failure ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol
[ https://issues.apache.org/jira/browse/KAFKA-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15549221#comment-15549221 ] Flavio Junqueira commented on KAFKA-3985: - I've created a pull request for this, focused on this issue. KAFKA-4140 is a broader set of changes, and it could incorporate the changes I'm proposing in the PR. In fact, it seems to change it in a similar way. It is still WIP, though, and might be better for the sake of seeing more green in our builds to check this one in while we wait for KAFKA-4140, but I'm happy with whatever folks prefer. > Transient system test failure > ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol > - > > Key: KAFKA-3985 > URL: https://issues.apache.org/jira/browse/KAFKA-3985 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 0.10.0.0 >Reporter: Jason Gustafson >Assignee: Flavio Junqueira > > Found this in the nightly build on the 0.10.0 branch. Full details here: > http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-07-22--001.1469199875--apache--0.10.0--71a598a/. > > {code} > test_id: > 2016-07-22--001.kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL > status: FAIL > run time: 5 minutes 14.067 seconds > 292 acked message did not make it to the Consumer. They are: 11264, > 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 11275, > 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 more. > Total Acked: 11343, Total Consumed: 11054. We validated that the first 272 of > these missing messages correctly made it into Kafka's data files. This > suggests they were lost on their way to the consumer. > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", > line 106, in run_all_tests > data = self.run_single_test() > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", > line 162, in run_single_test > return self.current_test_context.function(self.current_test) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py", > line 331, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py", > line 115, in test_zk_security_upgrade > self.run_produce_consume_validate(self.run_zk_migration) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py", > line 79, in run_produce_consume_validate > raise e > AssertionError: 292 acked message did not make it to the Consumer. They are: > 11264, 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, > 11275, 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 > more. Total Acked: 11343, Total Consumed: 11054. We validated that the first > 272 of these missing messages correctly made it into Kafka's data files. This > suggests they were lost on their way to the consumer. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3985) Transient system test failure ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol
[ https://issues.apache.org/jira/browse/KAFKA-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira reassigned KAFKA-3985: --- Assignee: Flavio Junqueira > Transient system test failure > ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol > - > > Key: KAFKA-3985 > URL: https://issues.apache.org/jira/browse/KAFKA-3985 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 0.10.0.0 >Reporter: Jason Gustafson >Assignee: Flavio Junqueira > > Found this in the nightly build on the 0.10.0 branch. Full details here: > http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-07-22--001.1469199875--apache--0.10.0--71a598a/. > > {code} > test_id: > 2016-07-22--001.kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL > status: FAIL > run time: 5 minutes 14.067 seconds > 292 acked message did not make it to the Consumer. They are: 11264, > 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 11275, > 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 more. > Total Acked: 11343, Total Consumed: 11054. We validated that the first 272 of > these missing messages correctly made it into Kafka's data files. This > suggests they were lost on their way to the consumer. > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", > line 106, in run_all_tests > data = self.run_single_test() > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", > line 162, in run_single_test > return self.current_test_context.function(self.current_test) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py", > line 331, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py", > line 115, in test_zk_security_upgrade > self.run_produce_consume_validate(self.run_zk_migration) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py", > line 79, in run_produce_consume_validate > raise e > AssertionError: 292 acked message did not make it to the Consumer. They are: > 11264, 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, > 11275, 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 > more. Total Acked: 11343, Total Consumed: 11054. We validated that the first > 272 of these missing messages correctly made it into Kafka's data files. This > suggests they were lost on their way to the consumer. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3985) Transient system test failure ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol
[ https://issues.apache.org/jira/browse/KAFKA-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548125#comment-15548125 ] Flavio Junqueira edited comment on KAFKA-3985 at 10/5/16 9:02 AM: -- [~rsivaram] bq. I wouldn't have expected to see a CA with start time 09:55:15 at all. Is it possible that there was another system test running on that same host (the tests use a fixed file name for CA and truststore, so the tests would fail if there are multiple instances of system tests or any command that loads the security config class that is run on that host). That's a very good point, I'm thinking that the problem is that the CA files are in {{/tmp}}: {noformat} self.ca_crt_path = "/tmp/test.ca.crt" self.ca_jks_path = "/tmp/test.ca.jks" {noformat} and perhaps we should use {{mkdtemp}} like in {{generate_and_copy_keystore}}. Does it make sense? was (Author: fpj): [~rsivaram] .bq I wouldn't have expected to see a CA with start time 09:55:15 at all. Is it possible that there was another system test running on that same host (the tests use a fixed file name for CA and truststore, so the tests would fail if there are multiple instances of system tests or any command that loads the security config class that is run on that host). That's a very good point, I'm thinking that the problem is that the CA files are in {{/tmp}}: {noformat} self.ca_crt_path = "/tmp/test.ca.crt" self.ca_jks_path = "/tmp/test.ca.jks" {noformat} and perhaps we should use {{mkdtemp}} like in {{generate_and_copy_keystore}}. Does it make sense? > Transient system test failure > ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol > - > > Key: KAFKA-3985 > URL: https://issues.apache.org/jira/browse/KAFKA-3985 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 0.10.0.0 >Reporter: Jason Gustafson > > Found this in the nightly build on the 0.10.0 branch. Full details here: > http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-07-22--001.1469199875--apache--0.10.0--71a598a/. > > {code} > test_id: > 2016-07-22--001.kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL > status: FAIL > run time: 5 minutes 14.067 seconds > 292 acked message did not make it to the Consumer. They are: 11264, > 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 11275, > 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 more. > Total Acked: 11343, Total Consumed: 11054. We validated that the first 272 of > these missing messages correctly made it into Kafka's data files. This > suggests they were lost on their way to the consumer. > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", > line 106, in run_all_tests > data = self.run_single_test() > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", > line 162, in run_single_test > return self.current_test_context.function(self.current_test) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py", > line 331, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py", > line 115, in test_zk_security_upgrade > self.run_produce_consume_validate(self.run_zk_migration) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py", > line 79, in run_produce_consume_validate > raise e > AssertionError: 292 acked message did not make it to the Consumer. They are: > 11264, 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, > 11275, 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 > more. Total Acked: 11343, Total Consumed: 11054. We validated that the first > 272 of these missing messages correctly made it into Kafka's data files. This > suggests they were lost on their way to the consumer. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3985) Transient system test failure ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol
[ https://issues.apache.org/jira/browse/KAFKA-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548125#comment-15548125 ] Flavio Junqueira commented on KAFKA-3985: - [~rsivaram] .bq I wouldn't have expected to see a CA with start time 09:55:15 at all. Is it possible that there was another system test running on that same host (the tests use a fixed file name for CA and truststore, so the tests would fail if there are multiple instances of system tests or any command that loads the security config class that is run on that host). That's a very good point, I'm thinking that the problem is that the CA files are in {{/tmp}}: {noformat} self.ca_crt_path = "/tmp/test.ca.crt" self.ca_jks_path = "/tmp/test.ca.jks" {noformat} and perhaps we should use {{mkdtemp}} like in {{generate_and_copy_keystore}}. Does it make sense? > Transient system test failure > ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol > - > > Key: KAFKA-3985 > URL: https://issues.apache.org/jira/browse/KAFKA-3985 > Project: Kafka > Issue Type: Test > Components: system tests >Affects Versions: 0.10.0.0 >Reporter: Jason Gustafson > > Found this in the nightly build on the 0.10.0 branch. Full details here: > http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-07-22--001.1469199875--apache--0.10.0--71a598a/. > > {code} > test_id: > 2016-07-22--001.kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL > status: FAIL > run time: 5 minutes 14.067 seconds > 292 acked message did not make it to the Consumer. They are: 11264, > 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 11275, > 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 more. > Total Acked: 11343, Total Consumed: 11054. We validated that the first 272 of > these missing messages correctly made it into Kafka's data files. This > suggests they were lost on their way to the consumer. > Traceback (most recent call last): > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", > line 106, in run_all_tests > data = self.run_single_test() > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py", > line 162, in run_single_test > return self.current_test_context.function(self.current_test) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py", > line 331, in wrapper > return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py", > line 115, in test_zk_security_upgrade > self.run_produce_consume_validate(self.run_zk_migration) > File > "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py", > line 79, in run_produce_consume_validate > raise e > AssertionError: 292 acked message did not make it to the Consumer. They are: > 11264, 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, > 11275, 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 > more. Total Acked: 11343, Total Consumed: 11054. We validated that the first > 272 of these missing messages correctly made it into Kafka's data files. This > suggests they were lost on their way to the consumer. > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-4196) Transient test failure: DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK
[ https://issues.apache.org/jira/browse/KAFKA-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15507443#comment-15507443 ] Flavio Junqueira commented on KAFKA-4196: - After a cursory look, it looks like we could have this behavior if: # There is an event that triggers {{TopicsChangeListener.handleChildChange}}. # The previous event is followed by a broker change: {{BrokerChangeListener}}. According to the description, we do have the broker change event. The topics event only happens after the topic has been deleted from under {{/broker/topics}} in zk, though. If the controller instance that triggers the first is the same that deletes the topic, then it doesn't look like we can have the behavior above because: 1) all those events are processed under the controller context lock; 2) the controller deletes the topic znodes and updates {{ControllerContext.partitionLeadershipInfo}} and {{controllerContext.partitionReplicaAssignment}}. Consequently, one possibility is a race between two controllers. One puzzling point is that the delete znode for the topic isn't going away, which indicates that no controller instance is completing successfully the delete operation. I'd need to investigate some more to find the culprit. If it happens again and you have a chance, please upload the logs. I'll see if I can repro locally. > Transient test failure: > DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK > --- > > Key: KAFKA-4196 > URL: https://issues.apache.org/jira/browse/KAFKA-4196 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma > Labels: transient-unit-test-failure > > The error: > {code} > java.lang.AssertionError: Admin path /admin/delete_topic/topic path not > deleted even after a replica is restarted > at org.junit.Assert.fail(Assert.java:88) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:752) > at kafka.utils.TestUtils$.verifyTopicDeletion(TestUtils.scala:1017) > at > kafka.admin.DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK(DeleteConsumerGroupTest.scala:156) > {code} > Caused by a broken invariant in the Controller: a partition exists in > `ControllerContext.partitionLeadershipInfo`, but not > `controllerContext.partitionReplicaAssignment`. > {code} > [2016-09-20 06:45:13,967] ERROR [BrokerChangeListener on Controller 1]: Error > while handling broker changes > (kafka.controller.ReplicaStateMachine$BrokerChangeListener:103) > java.util.NoSuchElementException: key not found: [topic,0] > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > kafka.controller.ControllerBrokerRequestBatch.kafka$controller$ControllerBrokerRequestBatch$$updateMetadataRequestMapFor$1(ControllerChannelManager.scala:310) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) > at > kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:343) > at > kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:1030) > at > kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:492) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:376) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at >
[jira] [Commented] (KAFKA-4196) Transient test failure: DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK
[ https://issues.apache.org/jira/browse/KAFKA-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15507287#comment-15507287 ] Flavio Junqueira commented on KAFKA-4196: - [~ijuma] is this from trunk? > Transient test failure: > DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK > --- > > Key: KAFKA-4196 > URL: https://issues.apache.org/jira/browse/KAFKA-4196 > Project: Kafka > Issue Type: Sub-task >Reporter: Ismael Juma > Labels: transient-unit-test-failure > > The error: > {code} > java.lang.AssertionError: Admin path /admin/delete_topic/topic path not > deleted even after a replica is restarted > at org.junit.Assert.fail(Assert.java:88) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:752) > at kafka.utils.TestUtils$.verifyTopicDeletion(TestUtils.scala:1017) > at > kafka.admin.DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK(DeleteConsumerGroupTest.scala:156) > {code} > Caused by a broken invariant in the Controller: a partition exists in > `ControllerContext.partitionLeadershipInfo`, but not > `controllerContext.partitionReplicaAssignment`. > {code} > [2016-09-20 06:45:13,967] ERROR [BrokerChangeListener on Controller 1]: Error > while handling broker changes > (kafka.controller.ReplicaStateMachine$BrokerChangeListener:103) > java.util.NoSuchElementException: key not found: [topic,0] > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.mutable.HashMap.apply(HashMap.scala:64) > at > kafka.controller.ControllerBrokerRequestBatch.kafka$controller$ControllerBrokerRequestBatch$$updateMetadataRequestMapFor$1(ControllerChannelManager.scala:310) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343) > at > kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343) > at scala.collection.mutable.HashSet.foreach(HashSet.scala:79) > at > kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:343) > at > kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:1030) > at > kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:492) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:376) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358) > at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234) > at > kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset
[ https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15419984#comment-15419984 ] Flavio Junqueira commented on KAFKA-1211: - [~junrao] bq. the leader needs to first wait for the follower to receive a message before it can advance the last committed offset. makes sense bq. it can propagate the last committed offset to the follower makes sense bq. the last committed offset in the follower is always behind that in the leader makes sense, it is either equal or behind, never ahead. bq. Since the follower truncates based on the local last committed offset, it's possible for the follower to truncate messages that are already committed by the leader. I'm not sure why we are doing this. A follower can't truncate until it hears from the leader upon recovery, it shouldn't truncate based on its local last committed offset. > Hold the produce request with ack > 1 in purgatory until replicas' HW has > larger than the produce offset > > > Key: KAFKA-1211 > URL: https://issues.apache.org/jira/browse/KAFKA-1211 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Fix For: 0.11.0.0 > > > Today during leader failover we will have a weakness period when the > followers truncate their data before fetching from the new leader, i.e., > number of in-sync replicas is just 1. If during this time the leader has also > failed then produce requests with ack >1 that have get responded will still > be lost. To avoid this scenario we would prefer to hold the produce request > in purgatory until replica's HW has larger than the offset instead of just > their end-of-log offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset
[ https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410693#comment-15410693 ] Flavio Junqueira commented on KAFKA-1211: - [~junrao] I was reading point (a) in your answer again, and there is something I don't understand. You say that the follower truncates and then become leader. This is fine, I understand it can happen. The bit I don't understand is how it can truncate committed messages. Let's say that we are talking about servers A and B, min ISR is 2 (the replica set can be larger than 2, but it doesn't really matter for this example): # A leads initially and B follows A. # B truncates # B becomes leader If A leads, the it means that it was previously in the ISR (assuming unclean leader election disabled) and it contains all committed messages. If B was also part of the previous ISR, then both A and B it will also have all committed and B won't truncate committed messages. The situation you describe can only happen if either A or B lose committed messages on their own and not because of the truncation, e.g., if the messages didn't make it from the page cache to disk before a crash. Is my understanding correct? > Hold the produce request with ack > 1 in purgatory until replicas' HW has > larger than the produce offset > > > Key: KAFKA-1211 > URL: https://issues.apache.org/jira/browse/KAFKA-1211 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Fix For: 0.11.0.0 > > > Today during leader failover we will have a weakness period when the > followers truncate their data before fetching from the new leader, i.e., > number of in-sync replicas is just 1. If during this time the leader has also > failed then produce requests with ack >1 that have get responded will still > be lost. To avoid this scenario we would prefer to hold the produce request > in purgatory until replica's HW has larger than the offset instead of just > their end-of-log offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset
[ https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410682#comment-15410682 ] Flavio Junqueira commented on KAFKA-1211: - [~junrao] I get (2), but (1) might need a bit more tweaking because of the following. Say the follower executes the steps in the order you describe: # Truncate log # Update LGS to reflect the truncated log (flush the prefix of LGS whose start offset is up to the log end offset) # Fetch and update the LGS as messages cross leader boundaries If the follower crashes in the middle of fetching and updating the LGS, it may leave the LGS in an inconsistent state. For example, let's say that it crossed the boundary of a generation, it writes to the log, and crashes before updating the LGS. I'm actually thinking that it might be better to have the update in the LGS first because in the worst case we point to an offset that is not in the log, so we know that the LGS entry is invalid. In any case, it sounds like there is some work to be done to make sure the LGS and the log are consistent. > Hold the produce request with ack > 1 in purgatory until replicas' HW has > larger than the produce offset > > > Key: KAFKA-1211 > URL: https://issues.apache.org/jira/browse/KAFKA-1211 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Fix For: 0.11.0.0 > > > Today during leader failover we will have a weakness period when the > followers truncate their data before fetching from the new leader, i.e., > number of in-sync replicas is just 1. If during this time the leader has also > failed then produce requests with ack >1 that have get responded will still > be lost. To avoid this scenario we would prefer to hold the produce request > in purgatory until replica's HW has larger than the offset instead of just > their end-of-log offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset
[ https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406063#comment-15406063 ] Flavio Junqueira commented on KAFKA-1211: - Thanks for the clarification, [~junrao]. There are a couple of specific points that still aren't entirely clear to me: # We are trying to preserve the generation when we copy messages to a follower, correct? In step 3.4, when we say that the follower flushes the LGS, we are more specifically trying to replicate the leader LGS, is that right? What happens if either the follower crashes or the leader changes between persisting the new LGS and fetching the new messages from the leader? I'm concerned that we will leave the LGS and the log of the broker in an inconsistent state. # When we say in step 3.4 that the follower needs to remember the LLG, I suppose this is just during this recovery period. Otherwise, once we have completed the sync up, the follower knows that the latest generation is the LLG. During sync up, there is the question I'm raising above, but it is also not super clear whether we need to persist the LLG independently to make sure that we don't have a situation in which the follower crashes, comes back, and accepts messages from a different generation. > Hold the produce request with ack > 1 in purgatory until replicas' HW has > larger than the produce offset > > > Key: KAFKA-1211 > URL: https://issues.apache.org/jira/browse/KAFKA-1211 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Fix For: 0.11.0.0 > > > Today during leader failover we will have a weakness period when the > followers truncate their data before fetching from the new leader, i.e., > number of in-sync replicas is just 1. If during this time the leader has also > failed then produce requests with ack >1 that have get responded will still > be lost. To avoid this scenario we would prefer to hold the produce request > in purgatory until replica's HW has larger than the offset instead of just > their end-of-log offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset
[ https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404245#comment-15404245 ] Flavio Junqueira edited comment on KAFKA-1211 at 8/2/16 4:14 PM: - [~junrao] let me ask a few clarification questions. # Is it right that the scenarios described here do not affect the cases in which min isr > 1 and unclean leader election is disabled? If min isr is greater than 1 and the leader is always coming from the latest isr, then the leader can either truncate the followers or have them fetch the missing log suffix. # The main goal of the proposal is to have replicas in a lossy configuration (e.g. min isr = 1, unclean leader election enabled) a leader and a follower converging to a common prefix by choosing an offset based on a common generation. The chosen generation is the largest generation in common between the two replicas. Is it right? # How do we guarantee that the generation id is unique, by using zookeeper versions? # I think there is a potential race between updating the leader-generation-checkpoint file and appending the first message of the generation. We might be better off rolling the log segment file and having the generation being part of the log segment file name. This way when we start a new generation, we also start a new file and we know precisely when a message from that generation has been appended. # Let's consider a scenario with 3 servers A B C. I'm again assuming that it is ok to have a single server up to ack requests. Say we have the following execution: ||Generation||A||B||C|| |1| |m1|m1| | | |m2|m2| |2|m3| | | | |m4| | | Say that now A and B start generation 3. They have no generation in common, so they start from zero, dropping m1 and m2. Is that right? If later on C joins A and B, then it will also drop m1 and m2, right? Given that the configuration is lossy, it doesn't wrong to do it as all we are trying to do is to converge to a consistent state. was (Author: fpj): [~junrao] let me ask a few clarification questions. # Is it right that the scenarios described here do not affect the cases in which min isr > 1 and unclean leader election is disabled? If min isr is greater than 1 and the leader is always coming from the latest isr, then the leader can either truncate the followers or have them fetch the missing log suffix. # The main goal of the proposal is to have replicas in a lossy configuration (e.g. min isr = 1, unclean leader election enabled) a leader and a follower converging to a common prefix by choosing an offset based on a common generation. The chosen generation is the largest generation in common between the two replicas. Is it right? # How do we guarantee that the generation id is unique, by using zookeeper versions? # I think there is a potential race between updating the leader-generation-checkpoint file and appending the first message of the generation. We might be better off rolling the log segment file and having the generation being part of the log segment file name. This way when we start a new generation, we also start a new file and we know precisely when a message from that generation has been appended. # Let's consider a scenario with 3 servers A B C. I'm again assuming that it is ok to have a single server up to ack requests. Say we have the following execution: {noformat} Generation AB C 1 m1 m1 m2 m2 2m3 m4 {noformat} Say that now A and B start generation 3. They have no generation in common, so the start from zero, dropping m1 and m2. Is that right? If later on C joins A and B, then it will also drop m1 and m2, right? Given that the configuration is lossy, it doesn't wrong to do it as all we are trying to do is to converge to a consistent state. > Hold the produce request with ack > 1 in purgatory until replicas' HW has > larger than the produce offset > > > Key: KAFKA-1211 > URL: https://issues.apache.org/jira/browse/KAFKA-1211 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Fix For: 0.11.0.0 > > > Today during leader failover we will have a weakness period when the > followers truncate their data before fetching from the new leader, i.e., > number of in-sync replicas is just 1. If during this time the leader has also > failed then produce requests with ack >1 that have get responded will still > be lost. To avoid this scenario we would prefer to
[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset
[ https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404245#comment-15404245 ] Flavio Junqueira commented on KAFKA-1211: - [~junrao] let me ask a few clarification questions. # Is it right that the scenarios described here do not affect the cases in which min isr > 1 and unclean leader election is disabled? If min isr is greater than 1 and the leader is always coming from the latest isr, then the leader can either truncate the followers or have them fetch the missing log suffix. # The main goal of the proposal is to have replicas in a lossy configuration (e.g. min isr = 1, unclean leader election enabled) a leader and a follower converging to a common prefix by choosing an offset based on a common generation. The chosen generation is the largest generation in common between the two replicas. Is it right? # How do we guarantee that the generation id is unique, by using zookeeper versions? # I think there is a potential race between updating the leader-generation-checkpoint file and appending the first message of the generation. We might be better off rolling the log segment file and having the generation being part of the log segment file name. This way when we start a new generation, we also start a new file and we know precisely when a message from that generation has been appended. # Let's consider a scenario with 3 servers A B C. I'm again assuming that it is ok to have a single server up to ack requests. Say we have the following execution: {noformat} Generation AB C 1 m1 m1 m2 m2 2m3 m4 {noformat} Say that now A and B start generation 3. They have no generation in common, so the start from zero, dropping m1 and m2. Is that right? If later on C joins A and B, then it will also drop m1 and m2, right? Given that the configuration is lossy, it doesn't wrong to do it as all we are trying to do is to converge to a consistent state. > Hold the produce request with ack > 1 in purgatory until replicas' HW has > larger than the produce offset > > > Key: KAFKA-1211 > URL: https://issues.apache.org/jira/browse/KAFKA-1211 > Project: Kafka > Issue Type: Bug >Reporter: Guozhang Wang >Assignee: Guozhang Wang > Fix For: 0.11.0.0 > > > Today during leader failover we will have a weakness period when the > followers truncate their data before fetching from the new leader, i.e., > number of in-sync replicas is just 1. If during this time the leader has also > failed then produce requests with ack >1 that have get responded will still > be lost. To avoid this scenario we would prefer to hold the produce request > in purgatory until replica's HW has larger than the offset instead of just > their end-of-log offsets. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)
[ https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361397#comment-15361397 ] Flavio Junqueira commented on KAFKA-873: I see what you're saying. That support in in zkClient was added specifically for our needs in Kafka. It is correct that it is more flexible to do the Curator way, but I'm not sure how we would be able to benefit from it in Kafka given that this security configuration is internal to the brokers. If you have any specific idea, let me know and we can see if it is feasible. > Consider replacing zkclient with curator (with zkclient-bridge) > --- > > Key: KAFKA-873 > URL: https://issues.apache.org/jira/browse/KAFKA-873 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.0 >Reporter: Scott Clasen >Assignee: Grant Henke > > If zkclient was replaced with curator and curator-x-zkclient-bridge it would > be initially a drop-in replacement > https://github.com/Netflix/curator/wiki/ZKClient-Bridge > With the addition of a few more props to ZkConfig, and a bit of code this > would open up the possibility of using ACLs in zookeeper (which arent > supported directly by zkclient), as well as integrating with netflix > exhibitor for those of us using that. > Looks like KafkaZookeeperClient needs some love anyhow... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)
[ https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361236#comment-15361236 ] Flavio Junqueira commented on KAFKA-873: [~mtomcat_sslavic] could be more specific about what you're missing? > Consider replacing zkclient with curator (with zkclient-bridge) > --- > > Key: KAFKA-873 > URL: https://issues.apache.org/jira/browse/KAFKA-873 > Project: Kafka > Issue Type: Improvement >Affects Versions: 0.8.0 >Reporter: Scott Clasen >Assignee: Grant Henke > > If zkclient was replaced with curator and curator-x-zkclient-bridge it would > be initially a drop-in replacement > https://github.com/Netflix/curator/wiki/ZKClient-Bridge > With the addition of a few more props to ZkConfig, and a bit of code this > would open up the possibility of using ACLs in zookeeper (which arent > supported directly by zkclient), as well as integrating with netflix > exhibitor for those of us using that. > Looks like KafkaZookeeperClient needs some love anyhow... -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3709) Create project security page
Flavio Junqueira created KAFKA-3709: --- Summary: Create project security page Key: KAFKA-3709 URL: https://issues.apache.org/jira/browse/KAFKA-3709 Project: Kafka Issue Type: Improvement Components: website Reporter: Flavio Junqueira Assignee: Flavio Junqueira We are creating a security@k.a.o mailing list to receive reports of potential vulnerabilities. Now that Kafka has security in place, the community might starts receiving vulnerability reports and we need to follow the guidelines here: http://www.apache.org/security/ Specifically, security issues are better handled in a project-specific list. This jira is to create a web page that informs users and contributors of how we are supposed to handle security issues. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3686) Kafka producer is not fault tolerant
[ https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281508#comment-15281508 ] Flavio Junqueira commented on KAFKA-3686: - (on behalf of [~lethalman]) [~fpj] yes, I want the first. In the case of network partitions I want to ensure my messages are stored. If the libraries don't do that, it means I have to reimplement them. Or otherwise, postpone sending such messages until the network partition resolves (which means implementing some kind of backlog on disk of the producer, which should instead be the kafka purpose after all). In both cases, it's something that is not documented and it's very inconvenient. > Kafka producer is not fault tolerant > > > Key: KAFKA-3686 > URL: https://issues.apache.org/jira/browse/KAFKA-3686 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Luca Bruno > > *Setup* > I have a cluster of 3 kafka server, a topic with 12 partitions with replica > 2, and a zookeeper cluster of 3 nodes. > Producer config: > {code} > props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092"); > props.put("acks", "1"); > props.put("batch.size", 16384); > props.put("retries", 3); > props.put("buffer.memory", 33554432); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > {code} > Producer code: > {code} > Producerproducer = new KafkaProducer<>(props); > for(int i = 0; i < 10; i++) { > Future f = producer.send(new ProducerRecord String>("topic", null, Integer.toString(i))); > f.get(); > } > {code} > *Problem* > Cut the network between the producer (p1) and one of the kafka servers (say > k1). > The cluster is healthy, hence the kafka bootstrap tells the producer that > there are 3 kafka servers (as I understood it), and the leaders of the > partitions of the topic. > So the producer will send messages to all of the 3 leaders for each > partition. If the leader happens to be k1 for a message, the producer raises > the following exception after request.timeout.ms: > {code} > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Batch Expired > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > at Test.main(Test.java:25) > Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired > {code} > In theory, the application should handle the failure. In practice, messages > are getting lost, even though there are other 2 leaders available for writing. > I tried with values of acks both 1 and -1. > *What I expected* > Given the client library is automatically deciding the hashing / round robin > schema for the partition, I would say it's not very important which partition > the message is being sent to. > I expect the client library to handle the failure by sending the message to a > partition of a different leader. > Neither kafka-clients nor rdkafka handle this failure. Given those are the > main client libraries being used for kafka as far as I know, I find it a > serious problem in terms of fault tolerance. > EDIT: I cannot add comments to this issue, don't understand why. To answer > [~fpj] yes, I want the first. In the case of network partitions I want to > ensure my messages are stored. If the libraries don't do that, it means I > have to reimplement them. Or otherwise, postpone sending such messages until > the network partition resolves (which means implementing some kind of backlog > on disk of the producer, which should instead be the kafka purpose after > all). In both cases, it's something that is not documented and it's very > inconvenient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3686) Kafka producer is not fault tolerant
[ https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281505#comment-15281505 ] Flavio Junqueira commented on KAFKA-3686: - [~lethalman] Jira is on lockdown because of spam, someone needs to add you as a contributor to jira. > Kafka producer is not fault tolerant > > > Key: KAFKA-3686 > URL: https://issues.apache.org/jira/browse/KAFKA-3686 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Luca Bruno > > *Setup* > I have a cluster of 3 kafka server, a topic with 12 partitions with replica > 2, and a zookeeper cluster of 3 nodes. > Producer config: > {code} > props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092"); > props.put("acks", "1"); > props.put("batch.size", 16384); > props.put("retries", 3); > props.put("buffer.memory", 33554432); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > {code} > Producer code: > {code} > Producerproducer = new KafkaProducer<>(props); > for(int i = 0; i < 10; i++) { > Future f = producer.send(new ProducerRecord String>("topic", null, Integer.toString(i))); > f.get(); > } > {code} > *Problem* > Cut the network between the producer (p1) and one of the kafka servers (say > k1). > The cluster is healthy, hence the kafka bootstrap tells the producer that > there are 3 kafka servers (as I understood it), and the leaders of the > partitions of the topic. > So the producer will send messages to all of the 3 leaders for each > partition. If the leader happens to be k1 for a message, the producer raises > the following exception after request.timeout.ms: > {code} > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Batch Expired > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > at Test.main(Test.java:25) > Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired > {code} > In theory, the application should handle the failure. In practice, messages > are getting lost, even though there are other 2 leaders available for writing. > I tried with values of acks both 1 and -1. > *What I expected* > Given the client library is automatically deciding the hashing / round robin > schema for the partition, I would say it's not very important which partition > the message is being sent to. > I expect the client library to handle the failure by sending the message to a > partition of a different leader. > Neither kafka-clients nor rdkafka handle this failure. Given those are the > main client libraries being used for kafka as far as I know, I find it a > serious problem in terms of fault tolerance. > EDIT: I cannot add comments to this issue, don't understand why. To answer > [~fpj] yes, I want the first. In the case of network partitions I want to > ensure my messages are stored. If the libraries don't do that, it means I > have to reimplement them. Or otherwise, postpone sending such messages until > the network partition resolves (which means implementing some kind of backlog > on disk of the producer, which should instead be the kafka purpose after > all). In both cases, it's something that is not documented and it's very > inconvenient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3686) Kafka producer is not fault tolerant
[ https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281490#comment-15281490 ] Flavio Junqueira commented on KAFKA-3686: - [~lethalman] thanks for bringing this up. There at least two possible ways of interpreting the round-robin behavior when producing: # I want to send my message to any partition available, so I don't care which partition takes it. # I want to split my messages evenly across partitions to balance the consumption load. One issue with the first is that it can cause load imbalance in the present of network partitions, but they both sound like sensible choices to me with their own pros and cons. I believe you're suggesting that you'd rather have the first or at least the choice of having the first option. > Kafka producer is not fault tolerant > > > Key: KAFKA-3686 > URL: https://issues.apache.org/jira/browse/KAFKA-3686 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.1 >Reporter: Luca Bruno > > *Setup* > I have a cluster of 3 kafka server, a topic with 12 partitions with replica > 2, and a zookeeper cluster of 3 nodes. > Producer config: > {code} > props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092"); > props.put("acks", "1"); > props.put("batch.size", 16384); > props.put("retries", 3); > props.put("buffer.memory", 33554432); > props.put("key.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > props.put("value.serializer", > "org.apache.kafka.common.serialization.StringSerializer"); > {code} > Producer code: > {code} > Producerproducer = new KafkaProducer<>(props); > for(int i = 0; i < 10; i++) { > Future f = producer.send(new ProducerRecord String>("topic", null, Integer.toString(i))); > f.get(); > } > {code} > *Problem* > Cut the network between the producer (p1) and one of the kafka servers (say > k1). > The cluster is healthy, hence the kafka bootstrap tells the producer that > there are 3 kafka servers (as I understood it), and the leaders of the > partitions of the topic. > So the producer will send messages to all of the 3 leaders for each > partition. If the leader happens to be k1 for a message, the producer raises > the following exception after request.timeout.ms: > {code} > Exception in thread "main" java.util.concurrent.ExecutionException: > org.apache.kafka.common.errors.TimeoutException: Batch Expired > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43) > at > org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25) > at Test.main(Test.java:25) > Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired > {code} > In theory, the application should handle the failure. In practice, messages > are getting lost, even though there are other 2 leaders available for writing. > I tried with values of acks both 1 and -1. > *What I expected* > Given the client library is automatically deciding the hashing / round robin > schema for the partition, I would say it's not very important which partition > the message is being sent to. > I expect the client library to handle the failure by sending the message to a > partition of a different leader. > Neither kafka-clients nor rdkafka handle this failure. Given those are the > main client libraries being used for kafka as far as I know, I find it a > serious problem in terms of fault tolerance. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3525) max.reserved.broker.id off-by-one error
[ https://issues.apache.org/jira/browse/KAFKA-3525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15273215#comment-15273215 ] Flavio Junqueira commented on KAFKA-3525: - Got it, thank you both for the clarification. {{getSequenceId}} needs to start from 1, otherwise we can get a collision. > max.reserved.broker.id off-by-one error > --- > > Key: KAFKA-3525 > URL: https://issues.apache.org/jira/browse/KAFKA-3525 > Project: Kafka > Issue Type: Bug > Components: config >Reporter: Alan Braithwaite >Assignee: Manikumar Reddy > Fix For: 0.10.1.0 > > > There's an off-by-one error in the config check / id generation for > max.reserved.broker.id setting. The auto-generation will generate > max.reserved.broker.id as the initial broker id as it's currently written. > Not sure what the consequences of this are if there's already a broker with > that id as I didn't test that behavior. > This can return 0 + max.reserved.broker.id: > https://github.com/apache/kafka/blob/8dbd688b1617968329087317fa6bde8b8df0392e/core/src/main/scala/kafka/utils/ZkUtils.scala#L213-L215 > However, this does a <= check, which is inclusive of max.reserved.broker.id: > https://github.com/apache/kafka/blob/8dbd688b1617968329087317fa6bde8b8df0392e/core/src/main/scala/kafka/server/KafkaConfig.scala#L984-L986 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3660) Log exception message in ControllerBrokerRequestBatch
[ https://issues.apache.org/jira/browse/KAFKA-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272265#comment-15272265 ] Flavio Junqueira commented on KAFKA-3660: - [~ijuma] could you have a look, pls? > Log exception message in ControllerBrokerRequestBatch > - > > Key: KAFKA-3660 > URL: https://issues.apache.org/jira/browse/KAFKA-3660 > Project: Kafka > Issue Type: Sub-task >Affects Versions: 0.9.0.1 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira > Fix For: 0.10.0.0 > > > In the main task, we observed that the exception that is causing a dirty > batch isn't being logged. We add here to the current logging so that we can > see the exception message to help us debug the main issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3660) Log exception message in ControllerBrokerRequestBatch
Flavio Junqueira created KAFKA-3660: --- Summary: Log exception message in ControllerBrokerRequestBatch Key: KAFKA-3660 URL: https://issues.apache.org/jira/browse/KAFKA-3660 Project: Kafka Issue Type: Sub-task Affects Versions: 0.9.0.1 Reporter: Flavio Junqueira Assignee: Flavio Junqueira Fix For: 0.10.0.0 In the main task, we observed that the exception that is causing a dirty batch isn't being logged. We add here to the current logging so that we can see the exception message to help us debug the main issue. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3525) max.reserved.broker.id off-by-one error
[ https://issues.apache.org/jira/browse/KAFKA-3525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272234#comment-15272234 ] Flavio Junqueira commented on KAFKA-3525: - I'm a bit confused by what this issue is trying to accomplish. The documentation for {{reserved.broker.max.id}} says "Max number that can be used for a broker.id", which in my interpretation says that the maximum id is the value of {{reserved.broker.max.id}}, and not that there are {{reserved.broker.max.id}} possible values reserved. Unless I'm missing something, the code is correct without the change proposed in the PR, but please clarify if I'm missing the point. > max.reserved.broker.id off-by-one error > --- > > Key: KAFKA-3525 > URL: https://issues.apache.org/jira/browse/KAFKA-3525 > Project: Kafka > Issue Type: Bug > Components: config >Reporter: Alan Braithwaite >Assignee: Manikumar Reddy > Fix For: 0.10.1.0 > > > There's an off-by-one error in the config check / id generation for > max.reserved.broker.id setting. The auto-generation will generate > max.reserved.broker.id as the initial broker id as it's currently written. > Not sure what the consequences of this are if there's already a broker with > that id as I didn't test that behavior. > This can return 0 + max.reserved.broker.id: > https://github.com/apache/kafka/blob/8dbd688b1617968329087317fa6bde8b8df0392e/core/src/main/scala/kafka/utils/ZkUtils.scala#L213-L215 > However, this does a <= check, which is inclusive of max.reserved.broker.id: > https://github.com/apache/kafka/blob/8dbd688b1617968329087317fa6bde8b8df0392e/core/src/main/scala/kafka/server/KafkaConfig.scala#L984-L986 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3173: Status: Open (was: Patch Available) > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Critical > Fix For: 0.10.0.0 > > Attachments: KAFKA-3173-race-repro.patch > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. > Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lostat > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272070#comment-15272070 ] Flavio Junqueira commented on KAFKA-3173: - [~junrao] True, {{onControllerFailover}} has the lock when it runs, and it is the only place where we call {{PartitionStateMachine.startup()}}. The confusing part is that the lock is acquired a few hops upwards in the call path, but it does look like the additional lock isn't necessary. Also, I'm wondering if we even need that controller lock. All the zk events are processed using the ZkClient event thread, and there is just one. The runs I was trying to put together had concurrent zk events being triggered, which was causing the potential problems I raised above. If there is any chance of internal threads racing excluding the ZkClient event thread, then the lock is needed, otherwise it isn't. I don't think we need the change I proposed, so I'll go ahead and close the PR, but we can't resolve this issue until we determine the cases in which we can get a dirty batch, preventing the controller from sending further requests. We need more info on this. One of the possibilities given what I've seen in other logs is simply that there is a transient error while sending a message to a broker in {{ControllerBrokerRequestBatch.sendRequestsToBrokers}}, but we are currently not logging the exception. I was hoping that the originator of the call would log it, but it isn't happen. Perhaps one thing we can do for the upcoming release is to log the exception in the case we observe the problem again. > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Critical > Fix For: 0.10.0.0 > > Attachments: KAFKA-3173-race-repro.patch > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. > Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lostat > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3640) Reduce the latency of topic metadata requests
Flavio Junqueira created KAFKA-3640: --- Summary: Reduce the latency of topic metadata requests Key: KAFKA-3640 URL: https://issues.apache.org/jira/browse/KAFKA-3640 Project: Kafka Issue Type: Improvement Affects Versions: 0.9.0.1 Reporter: Flavio Junqueira Assignee: Flavio Junqueira Fix For: 0.9.0.2 Changes to reduce the latency of topic metadata requests based on the PR of KAFKA-2073. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3128) Add metrics for ZooKeeper events
[ https://issues.apache.org/jira/browse/KAFKA-3128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254591#comment-15254591 ] Flavio Junqueira commented on KAFKA-3128: - [~junrao] Perhaps we can do more than just counting the session expiration events and count all events that come through {{SessionExpireListener.handleStateChange}}. For example, it might be useful to know if there are several connection loss events even if they don't lead to session expiration. > Add metrics for ZooKeeper events > > > Key: KAFKA-3128 > URL: https://issues.apache.org/jira/browse/KAFKA-3128 > Project: Kafka > Issue Type: Improvement > Components: core, zkclient >Reporter: Flavio Junqueira >Assignee: Ismael Juma > Fix For: 0.10.0.0 > > > It would be useful to report via Kafka metrics the number of ZK event > notifications, such as connection loss events, session expiration events, > etc., as a way of spotting potential issues with the communication with the > ZK ensemble. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247802#comment-15247802 ] Flavio Junqueira commented on KAFKA-3042: - [~junrao] It makes sense, thanks for the analysis. Trying to reconstruct the problem in steps, this is what's going on: # Broker 5 thinks broker 4 is alive and sends a LeaderAndIsr request to broker 1 with 4 as the leader. # Broker 1 doesn't have 4 cached as a live broker, so it fails the request to make it a follower of the partition. The LeaderAndIsr request has a list of live leaders, and I suppose 4 is in that list. To sort this out, I can see two options: # We simply update the metadata cache upon receiving a LeaderAndIsr request using the list of live leaders. This update needs to be the union of the current set with the set of leaders. # You also suggested to send an UpdateMetadata request first to update the set of love brokers. I can't see any problem with 1, and I can't see any immediate problem with 2 either, but I'm concerned about finding ourselves with another race condition if we send an update first. What do you think? > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Fix For: 0.10.0.0 > > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245725#comment-15245725 ] Flavio Junqueira commented on KAFKA-3042: - I checked the broker change listener output of broker 5: {noformat} [2016-04-09 00:37:54,079] INFO [BrokerChangeListener on Controller 5]: Broker change listener fired for path /brokers/ids with children 1,2,3,5 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2016-04-09 00:41:35,442] INFO [BrokerChangeListener on Controller 5]: Broker change listener fired for path /brokers/ids with children 1,4,5 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) {noformat} These are the two events I could find that determine a time interval including the request event. The LeaderAndIsrRequest from broker 5 comes in at {{[2016-04-09 00:40:58,144]}}. This is in this comment: https://issues.apache.org/jira/browse/KAFKA-3042?focusedCommentId=15236055=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15236055 > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Fix For: 0.10.0.0 > > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15241314#comment-15241314 ] Flavio Junqueira commented on KAFKA-3042: - [~junrao] In this comment: https://issues.apache.org/jira/browse/KAFKA-3042?focusedCommentId=15236055=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15236055 I showed that broker 5 is the one that sent the LeaderAndIsr request to broker 1, and in here: https://issues.apache.org/jira/browse/KAFKA-3042?focusedCommentId=15237383=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15237383 that broker 5 also didn't have broker 4 as a live broker when it sent the request to broker 1. It does sound right that the controller on failover should update the list of live brokers on other brokers before sending requests that make them followers or at least the problem should be transient in the sense that it could be corrected with a later message. However, it sounds like for the partition we are analyzing, there is this additional problem that controller 5 also didn't have broker 4 in its list of live brokers. Interestingly, I also caught an instance of this: {noformat} [2016-04-09 00:37:54,111] DEBUG Sending MetadataRequest to Brokers:ArrayBuffer(2, 5)... [2016-04-09 00:37:54,111] ERROR Haven't been able to send metadata update requests... [2016-04-09 00:37:54,112] ERROR [Controller 5]: Forcing the controller to resign (kafka.controller.KafkaController) {noformat} I don't think this is related, but we have been wondering in another issue about the possible causes of batches in {{ControllerBrokerRequestBatch}} not being empty, and there are a few occurrences of it in these logs. This is happening, however, right after the controller resigns, so I'm guessing this is related to the controller shutting down: {noformat} [2016-04-09 00:37:54,064] INFO [Controller 5]: Broker 5 resigned as the controller (kafka.controller.KafkaController) {noformat} In any case, for this last issue, I'll create a jira to make sure that we have enough info to identify this issue when it happens. Currently, the exception is being propagated, but nowhere we are logging the cause. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Fix For: 0.10.0.0 > > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15237824#comment-15237824 ] Flavio Junqueira commented on KAFKA-3042: - hey [~wushujames] bq. you said that broker 3 failed to release leadership to broker 4 because broker 4 was offline it is actually broker the one that failed to release leadership. bq. What is the correct behavior for that scenario? The behavior isn't incorrect in the following sense. We cannot completely prevent a single broker from being partitioned from the other replicas. If this broker is the leader before the partition, then it may remain in this state for some time. In the meanwhile, the other replicas may form a new ISR and make progress independently. But, very important, the partitioned broker won't be able to commit anything on its own, assuming that the minimum ISR is at least two. In the scenario we are discussing, we don't have a network partition, but the behavior is equivalent: broker 1 will remain the leader until it is able to follow successfully. The part is bad is that broker 1 isn't partitioned away, it is talking to other controllers, and the broker should be brought back into a state that it can make progress with that partition and others that are equally stuck. The bottom line is that is safe, but we clearly want the broker up and making progress with those partitions. Let me point out that from the logs, it looks like you have unclean leader election enabled because of this log message: {noformat} [2016-04-09 00:40:50,911] WARN [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [tec1.en2.frontend.syncPing,7]. Elect leader 4 from live brokers 4. There's potential data loss. (kafka.controller.OfflinePartitionLeaderSelector) {noformat} and no minimum ISR set: {noformat} [2016-04-09 00:56:53,009] WARN [Controller 5]: Cannot remove replica 1 from ISR of partition [tec1.en2.frontend.syncPing,7] since it is not in the ISR. Leader = 4 ; ISR = List(4) {noformat} Those options can cause some data loss. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15237383#comment-15237383 ] Flavio Junqueira commented on KAFKA-3042: - I had a look at the zookeeper logs, and I couldn’t see anything unusual. There are session expirations, but it is expected that sessions expire. Using the same topic-partition I used in my last comment, [tec1.en2.frontend.syncPing,7], I found that the reason seems to be that controller 5 is telling broker 1 that the partition leader is 4, but neither 5 nor 1 think that broker 4 is up. Here are some relevant log lines from broker 5: {noformat} [2016-04-09 00:37:54,079] INFO [BrokerChangeListener on Controller 5]: Broker change listener fired for path /brokers/ids with children 1,2,3,5 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2016-04-09 00:37:53,709] DEBUG [Partition state machine on Controller 5]: After leader election, leader cache is updated to Map…. [tec1.en2.frontend.syncPing,7] -> (Leader:2,ISR:2,LeaderEpoch:361,ControllerEpoch:410) [2016-04-09 00:37:53,765] INFO [Partition state machine on Controller 5]: Started partition state machine with initial state -> Map… [tec1.en2.frontend.syncPing,7] -> OnlinePartition [2016-04-09 00:40:58,415] DEBUG [Partition state machine on Controller 5]: After leader election, leader cache is updated to Map… [tec1.en2.frontend.syncPing,7] -> (Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415) [2016-04-09 00:41:35,442] INFO [BrokerChangeListener on Controller 5]: Broker change listener fired for path /brokers/ids with children 1,4,5 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) {noformat} Interestingly, broker 3 is the controller for epoch 415, see the last leader cache update, and this is the information that broker 1 receives from broker 5 (see the previous comment). It looks like broker 5 ignored the fact that broker 4 is down or at least not in its list of live brokers. Broker 3 seems to behave correctly with respect to the partition, here are some relevant log lines: {noformat} [2016-04-09 00:39:57,004] INFO [Controller 3]: Controller 3 incremented epoch to 415 (kafka.controller.KafkaController) [2016-04-09 00:40:46,633] INFO [BrokerChangeListener on Controller 3]: Broker change listener fired for path /brokers/ids with children 3,4,5 (kafka.controller.ReplicaStateMachine$BrokerChangeListener) [2016-04-09 00:40:46,638] INFO [BrokerChangeListener on Controller 3]: Newly added brokers: 4, deleted brokers: , all live brokers: 3,4,5 (kafka.controller.ReplicaStateMachine [2016-04-09 00:40:50,911] DEBUG [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [tec1.en2.frontend.syncPing,7]. Pick the leader from the alive assigned replicas: 4 (kafka.controller.OfflinePartitionLeaderSelector) [2016-04-09 00:40:50,911] WARN [OfflinePartitionLeaderSelector]: No broker in ISR is alive for [tec1.en2.frontend.syncPing,7]. Elect leader 4 from live brokers 4. There's potential data loss. (kafka.controller.OfflinePartitionLeaderSelector) [2016-04-09 00:40:50,911] INFO [OfflinePartitionLeaderSelector]: Selected new leader and ISR {"leader":4,"leader_epoch":364,"isr":[4]} for offline partition [tec1.en2.frontend.syncPing,7] (kafka.controller.OfflinePartitionLeaderSelector) State-change log [2016-04-09 00:40:50,909] TRACE Controller 3 epoch 415 started leader election for partition [tec1.en2.frontend.syncPing,7] (state.change.logger) [2016-04-09 00:40:50,911] TRACE Controller 3 epoch 415 elected leader 4 for Offline partition [tec1.en2.frontend.syncPing,7] (state.change.logger) [2016-04-09 00:40:50,930] TRACE Controller 3 epoch 415 changed partition [tec1.en2.frontend.syncPing,7] from OfflinePartition to OnlinePartition with leader 4 (state.change.logger) {noformat} To summarize, the problems seems to be that controller 5 tells broker 1 that the partition leader is an unavailable broker, and broker 1 fails to change the partition leader. As it fails to update the leader to broker 4, broker 1 remains the leader, which causes it to keep trying to update the ISR and printing out the “Cached zkVersion…” messages. Broker 1 does not receive any controller update that enables it to correct the problem later on and consequently it is stuck with itself as partition leader incorrectly. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not
[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236055#comment-15236055 ] Flavio Junqueira edited comment on KAFKA-3042 at 4/12/16 2:58 PM: -- Thanks for the logs. I still don’t have an answer, but I wanted to report some progress. I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are a few things that I’ve observed: Broker 1 is the leader of the partition: {noformat} [2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4) for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata request sent by controller 1 epoch 414 with correlation id 876 (state.change.logger) {noformat} but soon after it fails to release leadership to broker 4: {noformat} [2016-04-09 00:40:58,106] TRACE Broker 1 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415),ReplicationFactor:3),AllReplicas:1,2,4) correlation id 0 from controller 5 epoch 416 for partition [tec1.en2.frontend.syncPing,7] (state.change.logger) [2016-04-09 00:40:58,139] TRACE Broker 1 handling LeaderAndIsr request correlationId 0 from controller 5 epoch 416 starting the become-follower transition for partition [tec1.en2.frontend.syncPing,7] (state.change.logger) [2016-04-09 00:40:58,144] ERROR Broker 1 received LeaderAndIsrRequest with correlation id 0 from controller 5 epoch 415 for partition [tec1.en2.frontend.syncPing,7] but cannot become follower since the new leader 4 is unavailable. (state.change.logger) {noformat} Now, a bit later in the log, the broker says that it is caching the leader info for the partition: {noformat} [2016-04-09 00:42:02,456] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415),ReplicationFactor:3),AllReplicas:1,2,4) for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata request sent by controller 5 epoch 416 with correlation id 1473 (state.change.logger) {noformat} but it keeps printing the “Cached zkVersion…” errors, which indicate that the broker still believes it is the leader of the partition, or at least the variable {{leaderReplicaIdOpt}} is set this way. I also inspected other partitions, and the behavior doesn’t seem to be consistent. I’ve seen at least one partition in broker 2 for which the broker made the appropriate transition: {noformat} [2016-04-09 00:39:23,840] TRACE Broker 2 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:3,ISR:2,3,LeaderEpoch:305,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:3,2,4) correlation id 535 from controller 1 epoch 414 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,841] TRACE Broker 2 handling LeaderAndIsr request correlationId 535 from controller 1 epoch 414 starting the become-follower transition for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,841] TRACE Broker 2 stopped fetchers as part of become-follower request from controller 1 epoch 414 with correlation id 535 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 truncated logs and checkpointed recovery boundaries for partition [tec1.ono_qe1.bodydata.recordings,20] as part of become-follower request with correlation id 535 from controller 1 epoch 414 (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 started fetcher to new leader as part of become-follower request from controller 1 epoch 414 with correlation id 535 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 completed LeaderAndIsr request correlationId 535 from controller 1 epoch 414 for the become-follower transition for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) {noformat} Actually, the state-change log of broker 2 seems to have a gap starting at {{[2016-04-09 00:39:46,246]}}. Is it when you’ve restarted the broker? was (Author: fpj): Thanks for the logs. I still don’t have an answer, but I wanted to report some progress. I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are a few things that I’ve observed: Broker 1 is the leader of the partition: {noformat} [2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4) for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata request sent by controller 1 epoch 414 with
[jira] [Resolved] (KAFKA-3205) Error in I/O with host (java.io.EOFException) raised in producer
[ https://issues.apache.org/jira/browse/KAFKA-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira resolved KAFKA-3205. - Resolution: Won't Fix Reviewer: Flavio Junqueira Fix Version/s: (was: 0.10.0.0) The changes currently in 0.9+ doesn't have as many messages printed out because both ends, client and server, enforce the connection timeout. The change discussed in the pull request doesn't print it in the case of a passive close initiated by the server (in 0.9 the timeout is enforced), which is desirable only because it pollutes the logs otherwise. It is better that we keep these messages in 0.9 and later to be informed of connections being closed. They are not supposed to happen very often, but if it turns out to be a problem, we can revisit this issue. > Error in I/O with host (java.io.EOFException) raised in producer > > > Key: KAFKA-3205 > URL: https://issues.apache.org/jira/browse/KAFKA-3205 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.8.2.1, 0.9.0.0 >Reporter: Jonathan Raffre > > In a situation with a Kafka broker in 0.9 and producers still in 0.8.2.x, > producers seems to raise the following after a variable amount of time since > start : > {noformat} > 2016-01-29 14:33:13,066 WARN [] o.a.k.c.n.Selector: Error in I/O with > 172.22.2.170 > java.io.EOFException: null > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) > ~[org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at org.apache.kafka.common.network.Selector.poll(Selector.java:248) > ~[org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) > [org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) > [org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) > [org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66-internal] > {noformat} > This can be reproduced successfully by doing the following : > * Start a 0.8.2 producer connected to the 0.9 broker > * Wait 15 minutes, exactly > * See the error in the producer logs. > Oddly, this also shows up in an active producer but after 10 minutes of > activity. > Kafka's server.properties : > {noformat} > broker.id=1 > listeners=PLAINTEXT://:9092 > port=9092 > num.network.threads=2 > num.io.threads=2 > socket.send.buffer.bytes=1048576 > socket.receive.buffer.bytes=1048576 > socket.request.max.bytes=104857600 > log.dirs=/mnt/data/kafka > num.partitions=4 > auto.create.topics.enable=false > delete.topic.enable=true > num.recovery.threads.per.data.dir=1 > log.retention.hours=48 > log.retention.bytes=524288000 > log.segment.bytes=52428800 > log.retention.check.interval.ms=6 > log.roll.hours=24 > log.cleanup.policy=delete > log.cleaner.enable=true > zookeeper.connect=127.0.0.1:2181 > zookeeper.connection.timeout.ms=100 > {noformat} > Producer's configuration : > {noformat} > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 30 > metadata.fetch.timeout.ms = 6 > acks = all > batch.size = 16384 > reconnect.backoff.ms = 10 > bootstrap.servers = [127.0.0.1:9092] > receive.buffer.bytes = 32768 > retry.backoff.ms = 500 > buffer.memory = 33554432 > timeout.ms = 3 > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > retries = 3 > max.request.size = 500 > block.on.buffer.full = true > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > metrics.sample.window.ms = 3 > send.buffer.bytes = 131072 > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > linger.ms = 0 > client.id = > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236055#comment-15236055 ] Flavio Junqueira edited comment on KAFKA-3042 at 4/11/16 9:58 PM: -- Thanks for the logs. I still don’t have an answer, but I wanted to report some progress. I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are a few things that I’ve observed: Broker 1 is the leader of the partition: {noformat} [2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4) for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata request sent by controller 1 epoch 414 with correlation id 876 (state.change.logger) {noformat} but soon after it fails to release leadership to broker 4: {noformat} [2016-04-09 00:40:58,139] TRACE Broker 1 handling LeaderAndIsr request correlationId 0 from controller 5 epoch 416 starting the become-follower transition for partition [tec1.en2.frontend.syncPing,7] (state.change.logger) [2016-04-09 00:40:58,144] ERROR Broker 1 received LeaderAndIsrRequest with correlation id 0 from controller 5 epoch 415 for partition [tec1.en2.frontend.syncPing,7] but cannot become follower since the new leader 4 is unavailable. (state.change.logger) {noformat} Now, a bit later in the log, the broker says that it is caching the leader info for the partition: {noformat} [2016-04-09 00:42:02,456] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415),ReplicationFactor:3),AllReplicas:1,2,4) for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata request sent by controller 5 epoch 416 with correlation id 1473 (state.change.logger) {noformat} but it keeps printing the “Cached zkVersion…” errors, which indicate that the broker still believes it is the leader of the partition, or at least the variable {{leaderReplicaIdOpt}} is set this way. I also inspected other partitions, and the behavior doesn’t seem to be consistent. I’ve seen at least one partition in broker 2 for which the broker made the appropriate transition: {noformat} [2016-04-09 00:39:23,840] TRACE Broker 2 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:3,ISR:2,3,LeaderEpoch:305,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:3,2,4) correlation id 535 from controller 1 epoch 414 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,841] TRACE Broker 2 handling LeaderAndIsr request correlationId 535 from controller 1 epoch 414 starting the become-follower transition for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,841] TRACE Broker 2 stopped fetchers as part of become-follower request from controller 1 epoch 414 with correlation id 535 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 truncated logs and checkpointed recovery boundaries for partition [tec1.ono_qe1.bodydata.recordings,20] as part of become-follower request with correlation id 535 from controller 1 epoch 414 (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 started fetcher to new leader as part of become-follower request from controller 1 epoch 414 with correlation id 535 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 completed LeaderAndIsr request correlationId 535 from controller 1 epoch 414 for the become-follower transition for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) {noformat} Actually, the state-change log of broker 2 seems to have a gap starting at {{[2016-04-09 00:39:46,246]}}. Is it when you’ve restarted the broker? was (Author: fpj): Thanks for the logs. I still don’t have an answer, but I wanted to report some progress. I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are a few things that I’ve observed: Broker 1 is the leader of the partition: {noformat} [2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4) for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata request sent by controller 1 epoch 414 with correlation id 876 (state.change.logger) {noformat} but soon after it fails to release leadership to broker 4: {noformat} [2016-04-09 00:40:58,139] TRACE Broker 1 handling LeaderAndIsr request correlationId 0 from controller 5 epoch 416 starting the become-follower transition for partition
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236055#comment-15236055 ] Flavio Junqueira commented on KAFKA-3042: - Thanks for the logs. I still don’t have an answer, but I wanted to report some progress. I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are a few things that I’ve observed: Broker 1 is the leader of the partition: {noformat} [2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4) for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata request sent by controller 1 epoch 414 with correlation id 876 (state.change.logger) {noformat} but soon after it fails to release leadership to broker 4: {noformat} [2016-04-09 00:40:58,139] TRACE Broker 1 handling LeaderAndIsr request correlationId 0 from controller 5 epoch 416 starting the become-follower transition for partition [tec1.en2.frontend.syncPing,7] (state.change.logger) [2016-04-09 00:40:58,144] ERROR Broker 1 received LeaderAndIsrRequest with correlation id 0 from controller 5 epoch 415 for partition [tec1.en2.frontend.syncPing,7] but cannot become follower since the new leader 4 is unavailable. (state.change.logger) {no format} Now, a bit later in the log, the broker says that it is caching the leader info for the partition: {noformat} [2016-04-09 00:42:02,456] TRACE Broker 1 cached leader info (LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415),ReplicationFactor:3),AllReplicas:1,2,4) for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata request sent by controller 5 epoch 416 with correlation id 1473 (state.change.logger) {noformat} but it keeps printing the “Cached zkVersion…” errors, which indicate that the broker still believes it is the leader of the partition, or at least the variable {{leaderReplicaIdOpt}} is set this way. I also inspected other partitions, and the behavior doesn’t seem to be consistent. I’ve seen at least one partition in broker 2 for which the broker made the appropriate transition: {noformat} [2016-04-09 00:39:23,840] TRACE Broker 2 received LeaderAndIsr request (LeaderAndIsrInfo:(Leader:3,ISR:2,3,LeaderEpoch:305,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:3,2,4) correlation id 535 from controller 1 epoch 414 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,841] TRACE Broker 2 handling LeaderAndIsr request correlationId 535 from controller 1 epoch 414 starting the become-follower transition for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,841] TRACE Broker 2 stopped fetchers as part of become-follower request from controller 1 epoch 414 with correlation id 535 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 truncated logs and checkpointed recovery boundaries for partition [tec1.ono_qe1.bodydata.recordings,20] as part of become-follower request with correlation id 535 from controller 1 epoch 414 (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 started fetcher to new leader as part of become-follower request from controller 1 epoch 414 with correlation id 535 for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) [2016-04-09 00:39:23,856] TRACE Broker 2 completed LeaderAndIsr request correlationId 535 from controller 1 epoch 414 for the become-follower transition for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger) {noformat} Actually, the state-change log of broker 2 seems to have a gap starting at {{[2016-04-09 00:39:46,246]}}. Is it when you’ve restarted the broker? > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233564#comment-15233564 ] Flavio Junqueira commented on KAFKA-3042: - [~delbaeth] thanks for all the information. bq. Broker 1 rolled correctly and rejoined and leadership rebalancing occurred. After broker 2 rolled and came back up it now has an inconsistent view of the metadata. It thinks there are only 300 topics and all the other brokers believe there are 700. Should we file this as a separate issue? I'm not aware of this issue, so it does sound better to report it in a different jira and describe the problem in as much detail as possible. If you have logs for this problem, then please share. bq. We have managed to reproduce the problem and have a snapshot of the logs. The tarball is about a gigabyte. What should I do with it? If you have a web server that can host it, then perhaps you can upload it there. A dropbox/box/onedrive public folder that we can read from would also do it. bq. my zkCli.sh session which I was using to watch the controller exited here so I was disconnected for a minute This is odd. Could you describe in more detail what happened with the zkCli session? You said that it disconnected for a minute, but has the session expired? It must have expired, unless your session timeout was at least one minute, which according to your description, it wasn't. If you have them, please include the zk logs in the bundle. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232535#comment-15232535 ] Flavio Junqueira edited comment on KAFKA-3042 at 4/9/16 1:09 PM: - Here is what I've been able to find out so far based on the logs that [~wushujames] posted: # The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. The {{updatedIsr}} in the logs seem to be called from {{Partition.maybeShrinkIsr}}. # {{Partition.maybeShrinkIsr}} is called from {{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to schedule call in {{ReplicaManager.startup}}. This is the main reason we see those messages periodically coming up. # In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader replica is the broker itself, which is determined by the variable {{leaderReplicaIdOpt}}. It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is possible that it is due to a race with either the controllers or the execution of {{LeaderAndIsr}} requests. was (Author: fpj): Here is what I've been able to find out so far based on the logs that [~wushujames] posted: # The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. The {{updatedIsr}} in the logs seem to be called from {{Partition.maybeShrinkIsr}}. # {{Partition.maybeShrinkIsr}} is called from {{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to schedule call in {{ReplicaManager.startup}}. This is the main reason we see those messages periodically coming up. # In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader replica is the broker itself, which is determined by the variable {{leaderReplicaIdOpt}. It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is possible that it is due to a race with either the controllers or the execution of {{LeaderAndIsr} requests. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232535#comment-15232535 ] Flavio Junqueira commented on KAFKA-3042: - Here is what I've been able to find out so far based on the logs that [~wushujames] posted: # The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. The {{updatedIsr}} in the logs seem to be called from {{Partition.maybeShrinkIsr}}. # {{Partition.maybeShrinkIsr}} is called from {{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to schedule call in {{ReplicaManager.startup}}. This is the main reason we see those messages periodically coming up. # In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader replica is the broker itself, which is determined by the variable {{leaderReplicaIdOpt}. It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is possible that it is due to a race with either the controllers or the execution of {{LeaderAndIsr} requests. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > Attachments: controller.log, server.log.2016-03-23-01, > state-change.log > > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3524) ConnectorTaskId should also also include cluster id
Flavio Junqueira created KAFKA-3524: --- Summary: ConnectorTaskId should also also include cluster id Key: KAFKA-3524 URL: https://issues.apache.org/jira/browse/KAFKA-3524 Project: Kafka Issue Type: Improvement Components: copycat Reporter: Flavio Junqueira Assignee: Ewen Cheslack-Postava ConnectorTaskId currently uses the connector name and the task number to give a unique id to a task. This id is unique within a connect cluster, but it might not be unique across clusters. If I want to uniquely identify connector tasks across clusters, then I need to also use the cluster name as part of the id. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3469) kafka-topics lock down znodes with user principal when zk security is enabled.
[ https://issues.apache.org/jira/browse/KAFKA-3469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15222336#comment-15222336 ] Flavio Junqueira commented on KAFKA-3469: - [~singhashish] right, you're assuming two different principals and the current implementation assumes that there is only one shared by all processes that access the znodes. During the discussion of KIP-38 we talked about enabling the addition of different principal so that we can support scenarios like the one you describe. Currently, that's not possible, but zk itself does support it. Perhaps this jira could be the implementation of that feature. > kafka-topics lock down znodes with user principal when zk security is enabled. > -- > > Key: KAFKA-3469 > URL: https://issues.apache.org/jira/browse/KAFKA-3469 > Project: Kafka > Issue Type: Bug >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > In envs where ZK is kerberized, if a user, other than user running kafka > processes, creates a topic, ZkUtils will lock down corresponding znodes for > the user. Kafka will not be able to modify those znodes and that leaves the > topic unusable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3469) kafka-topics lock down znodes with user principal when zk security is enabled.
[ https://issues.apache.org/jira/browse/KAFKA-3469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15221262#comment-15221262 ] Flavio Junqueira commented on KAFKA-3469: - [~singhashish] I'm sorry, I missed the notification. Let me see if I understand this right: bq. ZkUtils will lock down corresponding znodes for the user You're saying that it sets the ACL with the wrong principal and consequently the brokers cannot use it? Currently, this is what we use: {noformat} list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL) list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE) {noformat} For context, we did talk about having different credentials for admin tools when we released 0.9 if needed, so maybe we should do it, but let me try to understand the scenario a bit better first. > kafka-topics lock down znodes with user principal when zk security is enabled. > -- > > Key: KAFKA-3469 > URL: https://issues.apache.org/jira/browse/KAFKA-3469 > Project: Kafka > Issue Type: Bug >Reporter: Ashish K Singh >Assignee: Ashish K Singh > > In envs where ZK is kerberized, if a user, other than user running kafka > processes, creates a topic, ZkUtils will lock down corresponding znodes for > the user. Kafka will not be able to modify those znodes and that leaves the > topic unusable. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3205) Error in I/O with host (java.io.EOFException) raised in producer
[ https://issues.apache.org/jira/browse/KAFKA-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220339#comment-15220339 ] Flavio Junqueira commented on KAFKA-3205: - [~bondj] I've commented on the pull request. > Error in I/O with host (java.io.EOFException) raised in producer > > > Key: KAFKA-3205 > URL: https://issues.apache.org/jira/browse/KAFKA-3205 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 0.8.2.1, 0.9.0.0 >Reporter: Jonathan Raffre > Fix For: 0.10.0.0 > > > In a situation with a Kafka broker in 0.9 and producers still in 0.8.2.x, > producers seems to raise the following after a variable amount of time since > start : > {noformat} > 2016-01-29 14:33:13,066 WARN [] o.a.k.c.n.Selector: Error in I/O with > 172.22.2.170 > java.io.EOFException: null > at > org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) > ~[org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at org.apache.kafka.common.network.Selector.poll(Selector.java:248) > ~[org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) > [org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) > [org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at > org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) > [org.apache.kafka.kafka-clients-0.8.2.0.jar:na] > at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66-internal] > {noformat} > This can be reproduced successfully by doing the following : > * Start a 0.8.2 producer connected to the 0.9 broker > * Wait 15 minutes, exactly > * See the error in the producer logs. > Oddly, this also shows up in an active producer but after 10 minutes of > activity. > Kafka's server.properties : > {noformat} > broker.id=1 > listeners=PLAINTEXT://:9092 > port=9092 > num.network.threads=2 > num.io.threads=2 > socket.send.buffer.bytes=1048576 > socket.receive.buffer.bytes=1048576 > socket.request.max.bytes=104857600 > log.dirs=/mnt/data/kafka > num.partitions=4 > auto.create.topics.enable=false > delete.topic.enable=true > num.recovery.threads.per.data.dir=1 > log.retention.hours=48 > log.retention.bytes=524288000 > log.segment.bytes=52428800 > log.retention.check.interval.ms=6 > log.roll.hours=24 > log.cleanup.policy=delete > log.cleaner.enable=true > zookeeper.connect=127.0.0.1:2181 > zookeeper.connection.timeout.ms=100 > {noformat} > Producer's configuration : > {noformat} > compression.type = none > metric.reporters = [] > metadata.max.age.ms = 30 > metadata.fetch.timeout.ms = 6 > acks = all > batch.size = 16384 > reconnect.backoff.ms = 10 > bootstrap.servers = [127.0.0.1:9092] > receive.buffer.bytes = 32768 > retry.backoff.ms = 500 > buffer.memory = 33554432 > timeout.ms = 3 > key.serializer = class > org.apache.kafka.common.serialization.StringSerializer > retries = 3 > max.request.size = 500 > block.on.buffer.full = true > value.serializer = class > org.apache.kafka.common.serialization.StringSerializer > metrics.sample.window.ms = 3 > send.buffer.bytes = 131072 > max.in.flight.requests.per.connection = 5 > metrics.num.samples = 2 > linger.ms = 0 > client.id = > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils
[ https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206935#comment-15206935 ] Flavio Junqueira commented on KAFKA-3210: - [~granthenke] my main goal here is to have access to the async API and have more control over what happens between sessions. I'm more used to programming against zookeeper directly, so I'm more inclined to pursue that direction. Also, given how ZkUtils is structured, it is not entirely clear to me how much we would be able to benefit from the recipes that curator can offer. Having said that, I don't have any major concern with using a different wrapper if this community prefers that option as long as we are able to make use of asynchronous calls and have more control over session creation. > Using asynchronous calls through the raw ZK API in ZkUtils > -- > > Key: KAFKA-3210 > URL: https://issues.apache.org/jira/browse/KAFKA-3210 > Project: Kafka > Issue Type: Improvement > Components: controller, zkclient >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira > > We have observed a number of issues with the controller interaction with > ZooKeeper mainly because ZkClient creates new sessions transparently under > the hood. Creating sessions transparently enables, for example, old > controller to successfully update znodes in ZooKeeper even when they aren't > the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass > the ZkClient lib like we did with ZKWatchedEphemeral. > In addition to fixing such races with the controller, it would improve > performance significantly if we used the async API (see KAFKA-3038). The > async API is more efficient because it pipelines the requests to ZooKeeper, > and the number of requests upon controller recovery can be large. > This jira proposes to make these two changes to the calls in ZkUtils and to > do it, one path is to first replace the calls in ZkUtils with raw async ZK > calls and block so that we don't have to change the controller code in this > phase. Once this step is accomplished and it is stable, we make changes to > the controller to handle the asynchronous calls to ZooKeeper. > Note that in the first step, we will need to introduce some new logic for > session management, which is currently handled entirely by ZkClient. We will > also need to implement the subscription mechanism for event notifications > (see ZooKeeperLeaderElector as a an exemple). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3403) Upgrade ZkClient to 0.8
[ https://issues.apache.org/jira/browse/KAFKA-3403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198172#comment-15198172 ] Flavio Junqueira commented on KAFKA-3403: - [~granthenke] zkclient 0.8 is already available. are you going to do a pull request to upgrade it? > Upgrade ZkClient to 0.8 > --- > > Key: KAFKA-3403 > URL: https://issues.apache.org/jira/browse/KAFKA-3403 > Project: Kafka > Issue Type: Improvement >Reporter: Grant Henke >Assignee: Grant Henke >Priority: Critical > > KAFKA-3328 requires a versioned delete that [~fpj] added to ZkClient and will > be available in the 0.8 release. When released, we should upgrade to zkclient > 0.8. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3403) Upgrade ZkClient to 0.8
[ https://issues.apache.org/jira/browse/KAFKA-3403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198173#comment-15198173 ] Flavio Junqueira commented on KAFKA-3403: - http://mvnrepository.com/artifact/com.101tec/zkclient/0.8 > Upgrade ZkClient to 0.8 > --- > > Key: KAFKA-3403 > URL: https://issues.apache.org/jira/browse/KAFKA-3403 > Project: Kafka > Issue Type: Improvement >Reporter: Grant Henke >Assignee: Grant Henke >Priority: Critical > > KAFKA-3328 requires a versioned delete that [~fpj] added to ZkClient and will > be available in the 0.8 release. When released, we should upgrade to zkclient > 0.8. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations
[ https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794 ] Flavio Junqueira edited comment on KAFKA-3215 at 3/11/16 4:34 PM: -- [~junrao] Let me see if I understand this issue correctly. bq. broker 1 is the controller and it has 2 consecutive ZK session expirations As I understand it, one possible run that reflects this is the following: # zkclient creates a session S1 # S1 session expires # zkclient queues the session expiration event to deliver to the kafka broker # zkclient creates a new session S2 # S2 expires # zkclient queues the session expiration for S2 and the event for S1 still hasn't been delivered # zkclient creates a third session S3 # broker 1 processes the session expiration of S1 # broker 1 successfully elects itself leader/controller in session S3 # broker 1 processes session expiration for S2 After this last step, the broker is messed up because the replica state machine isn't properly initialized. Also, the broker won't give up leadership because the ephemeral has been created in the current session. I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode exists while creating it, we check if the existing znode has the same session owner, in which case the operation returns ok and the controller becomes leader. Does it make sense? was (Author: fpj): [~junrao] Let me see if I understand this issue correctly. bq. broker 1 is the controller and it has 2 consecutive ZK session expirations As I understand this, one possible run that reflects this is the following: # zkclient creates a session S1 # S1 session expires # zkclient queues the session expiration event to deliver to the kafka broker # zkclient creates a new session S2 # S2 expires # zkclient queues the session expiration for S2 and the event for S1 still hasn't been delivered # zkclient creates a third session S3 # broker 1 processes the session expiration of S1 # broker 1 successfully elects itself leader/controller in session S3 # broker 1 processes session expiration for S2 After this last step, broker S2 is messed up because the replica state machine isn't properly initialized. Also, the broker won't give up leadership because the ephemeral has been created in the current session. I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode exists while creating it, we check if the existing znode has the same session owner, in which case the operation returns ok and the controller becomes leader. Does it make sense? > controller may not be started when there are multiple ZK session expirations > > > Key: KAFKA-3215 > URL: https://issues.apache.org/jira/browse/KAFKA-3215 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jun Rao >Assignee: Flavio Junqueira > Labels: controller > > Suppose that broker 1 is the controller and it has 2 consecutive ZK session > expirations. In this case, two ZK session expiration events will be fired. > 1. When handling the first ZK session expiration event, > SessionExpirationListener.handleNewSession() can elect broker 1 itself as the > new controller and initialize the states properly. > 2. When handling the second ZK session expiration event, > SessionExpirationListener.handleNewSession() first calls > onControllerResignation(), which will set ReplicaStateMachine.hasStarted to > false. It then continues to do controller election in > ZookeeperLeaderElector.elect() and try to create the controller node in ZK. > This will fail since broker 1 has already registered itself as the controller > node in ZK. In this case, we simply ignore the failure to create the > controller node since we assume the controller must be in another broker. > However, in this case, the controller is broker 1 itself, but the > ReplicaStateMachine.hasStarted is still false. > 3. Now, if a new broker event is fired, we will be ignoring the event in > BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted > is false. Now, we are in a situation that a controller is alive, but won't > react to any broker change event. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations
[ https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794 ] Flavio Junqueira edited comment on KAFKA-3215 at 3/11/16 1:21 PM: -- [~junrao] Let me see if I understand this issue correctly. bq. broker 1 is the controller and it has 2 consecutive ZK session expirations As I understand this, one possible run that reflects this is the following: # zkclient creates a session S1 # S1 session expires # zkclient queues the session expiration event to deliver to the kafka broker # zkclient creates a new session S2 # S2 expires # zkclient queues the session expiration for S2 and the event for S1 still hasn't been delivered # zkclient creates a third session S3 # broker 1 processes the session expiration of S1 # broker 1 successfully elects itself leader/controller in session S3 # broker 1 processes session expiration for S2 After this last step, broker S2 is messed up because the replica state machine isn't properly initialized. Also, the broker won't give up leadership because the ephemeral has been created in the current session. I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode exists while creating it, we check if the existing znode has the same session owner, in which case the operation returns ok and the controller becomes leader. Does it make sense? was (Author: fpj): [~junrao] Let me see if I understand this issue correctly. bq. broker 1 is the controller and it has 2 consecutive ZK session expirations As I understand this, one possible run that reflects this is the following: # zkclient creates a session S1 # S1 session expires # zkclient queues the session expiration event to deliver to the kafka broker # zkclient creates a new session S2 # S2 expires # zkclient queues the session expiration for S2 and the event for S1 still hasn't been delivered # zkclient creates a third session S3 # broker 1 processes the session expiration of S1 # broker 1 successfully elects itself leader/controller in session S3 # broker 1 processes session expiration for S2 After this last step, broker S2 is messed up because it thinks the replica state machine isn't properly initialized. Also, the broker won't give up leadership because the ephemeral has been created in the current session. I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode exists while creating it, we check if the existing znode has the same session owner, in which case the operation returns ok and the controller becomes leader. Does it make sense? > controller may not be started when there are multiple ZK session expirations > > > Key: KAFKA-3215 > URL: https://issues.apache.org/jira/browse/KAFKA-3215 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jun Rao >Assignee: Flavio Junqueira > Labels: controller > > Suppose that broker 1 is the controller and it has 2 consecutive ZK session > expirations. In this case, two ZK session expiration events will be fired. > 1. When handling the first ZK session expiration event, > SessionExpirationListener.handleNewSession() can elect broker 1 itself as the > new controller and initialize the states properly. > 2. When handling the second ZK session expiration event, > SessionExpirationListener.handleNewSession() first calls > onControllerResignation(), which will set ReplicaStateMachine.hasStarted to > false. It then continues to do controller election in > ZookeeperLeaderElector.elect() and try to create the controller node in ZK. > This will fail since broker 1 has already registered itself as the controller > node in ZK. In this case, we simply ignore the failure to create the > controller node since we assume the controller must be in another broker. > However, in this case, the controller is broker 1 itself, but the > ReplicaStateMachine.hasStarted is still false. > 3. Now, if a new broker event is fired, we will be ignoring the event in > BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted > is false. Now, we are in a situation that a controller is alive, but won't > react to any broker change event. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations
[ https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira reassigned KAFKA-3215: --- Assignee: Flavio Junqueira > controller may not be started when there are multiple ZK session expirations > > > Key: KAFKA-3215 > URL: https://issues.apache.org/jira/browse/KAFKA-3215 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jun Rao >Assignee: Flavio Junqueira > Labels: controller > > Suppose that broker 1 is the controller and it has 2 consecutive ZK session > expirations. In this case, two ZK session expiration events will be fired. > 1. When handling the first ZK session expiration event, > SessionExpirationListener.handleNewSession() can elect broker 1 itself as the > new controller and initialize the states properly. > 2. When handling the second ZK session expiration event, > SessionExpirationListener.handleNewSession() first calls > onControllerResignation(), which will set ReplicaStateMachine.hasStarted to > false. It then continues to do controller election in > ZookeeperLeaderElector.elect() and try to create the controller node in ZK. > This will fail since broker 1 has already registered itself as the controller > node in ZK. In this case, we simply ignore the failure to create the > controller node since we assume the controller must be in another broker. > However, in this case, the controller is broker 1 itself, but the > ReplicaStateMachine.hasStarted is still false. > 3. Now, if a new broker event is fired, we will be ignoring the event in > BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted > is false. Now, we are in a situation that a controller is alive, but won't > react to any broker change event. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations
[ https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794 ] Flavio Junqueira commented on KAFKA-3215: - [~junrao] Let me see if I understand this issue correctly. bq. broker 1 is the controller and it has 2 consecutive ZK session expirations As I understand this, one possible run that reflects this is the following: # zkclient creates a session S1 # S1 session expires # zkclient queues the session expiration event to deliver to the kafka broker # zkclient creates a new session S2 # S2 expires # zkclient queues the session expiration for S2 and the event for S1 still hasn't been delivered # zkclient creates a third session S3 # broker 1 processes the session expiration of S1 # broker 1 successfully elects itself leader/controller in session S3 # broker 1 processes session expiration for S2 After this last step, broker S2 is messed up because it thinks the replica state machine isn't properly initialized. Also, the broker won't give up leadership because the ephemeral has been created in the current session. I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode exists while creating it, we check if the existing znode has the same session owner, in which case the operation returns ok and the controller becomes leader. Does it make sense? > controller may not be started when there are multiple ZK session expirations > > > Key: KAFKA-3215 > URL: https://issues.apache.org/jira/browse/KAFKA-3215 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Jun Rao > Labels: controller > > Suppose that broker 1 is the controller and it has 2 consecutive ZK session > expirations. In this case, two ZK session expiration events will be fired. > 1. When handling the first ZK session expiration event, > SessionExpirationListener.handleNewSession() can elect broker 1 itself as the > new controller and initialize the states properly. > 2. When handling the second ZK session expiration event, > SessionExpirationListener.handleNewSession() first calls > onControllerResignation(), which will set ReplicaStateMachine.hasStarted to > false. It then continues to do controller election in > ZookeeperLeaderElector.elect() and try to create the controller node in ZK. > This will fail since broker 1 has already registered itself as the controller > node in ZK. In this case, we simply ignore the failure to create the > controller node since we assume the controller must be in another broker. > However, in this case, the controller is broker 1 itself, but the > ReplicaStateMachine.hasStarted is still false. > 3. Now, if a new broker event is fired, we will be ignoring the event in > BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted > is false. Now, we are in a situation that a controller is alive, but won't > react to any broker change event. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3173: Attachment: KAFKA-3173-race-repro.patch > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Critical > Fix For: 0.10.0.0 > > Attachments: KAFKA-3173-race-repro.patch > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. > Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lostat > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3173: Attachment: (was: KAFKA-3173-race-repo.patch) > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Critical > Fix For: 0.10.0.0 > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. > Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lostat > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3173: Attachment: KAFKA-3173-race-repo.patch > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Critical > Fix For: 0.10.0.0 > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. > Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lostat > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15189024#comment-15189024 ] Flavio Junqueira commented on KAFKA-3173: - I have investigated the two races further and the first one is there but turns out to be harmless because we check {{hasStarted}} before adding any message to the batch. Consequently, the batch is not left dirty. We should still fix to avoid the ugly exception, but it is less critical. The second race is a real problem. I have been able to reproduce it and it can cause either the startup to fail or the zk listener event to be skipped. Here is an output from the repro: {noformat} [2016-03-10 09:58:21,257] ERROR [Partition state machine on Controller 0]: (kafka.controller.PartitionStateMachine:100) java.lang.Exception at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$3.apply(PartitionStateMachine.scala:158) at kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$3.apply(PartitionStateMachine.scala:158) at kafka.utils.Logging$class.error(Logging.scala:100) at kafka.controller.PartitionStateMachine.error(PartitionStateMachine.scala:44) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:158) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:518) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:505) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:455) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:437) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:437) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:255) at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:436) at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) [2016-03-10 09:58:21,447] ERROR [Partition state machine on Controller 0]: Error while moving some partitions to the online state (kafka.controller.PartitionStateMachine:103) java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some LeaderAndIsr state changes Map(1 -> Map(topic1-0 -> (LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:1))) might be lost at kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) at kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:126) at kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:71) at kafka.controller.ControllerFailoverTest.testStartupRace(ControllerFailoverTest.scala:119) {noformat} The first exception is induced, just to know what call is concurrent with the call to startup. The second exception is due to the batch being dirty when I call startup on {{partitionStateMachine}}. It can happen the other way around too and the topic update can fail. Wrapping the call to {{triggerOnlinePartitionStateChange}} with the controller lock solves the issue. Unfortunately, I had to instrument the code to trigger the race. It is hard to test these cases without being invasive, so I'm inclined to not add test cases for this. I'll post the changes I have used to repro the two issues I've mentioned. Note that they are test cases, but they don't actually fail because the current code catches the illegal state exception. > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Critical > Fix For: 0.10.0.0 > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change
[jira] [Commented] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15187230#comment-15187230 ] Flavio Junqueira commented on KAFKA-3173: - I actually can't reorder it like that, it breaks the controller if I do it, and in fact, the race condition as I described above can't happen because the listener callbacks check whether the corresponding state machines have started or not. There are two other possible races I found, though. 1- In {{onControllerFailover}}, we have the following: {noformat} replicaStateMachine.startup() partitionStateMachine.startup() {noformat} By invoking {{replicaStateMachine.startup()}} first, we set {{hasStarted}} to true. If there is a broker change, then the listener callback will be executed ({{ReplicaStateMahcine.hasSarted}} is true), which will eventually invoke {{partitionStateMachine.triggerOnlinePartitionStateChange()}}. If {{partitionStateMachine.startup()}} hasn't started yet, then {{partitionStateMachine.hasStarted}} is false and we will end up with a dirty batch. We need to swap those calls. I also checked that there is no flow the other way around (partitionStateMachine -> replicaStateMachine), so it is safe to swap the calls. 2- If a zookeeper event is triggered while either state machine is starting up, then it is possible that either the startup call or the handling of the zookeeper event will find a dirty batch because of concurrent execution. In this case, we don't leave a dirty batch because it clears after the execution, but one of the calls might fail and we don't have code to recover. We are currently locking many of these calls with a controller lock to prevent races, but apparently we forgot to add it to the startup calls, like this: {noformat} inLock(controllerContext.controllerLock) { triggerOnlinePartitionStateChange() } {noformat} I verified that this doesn't break anything, but I want to write a test case to see if I can repro the problem. > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Critical > Fix For: 0.10.0.0 > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. > Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lostat > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15186928#comment-15186928 ] Flavio Junqueira commented on KAFKA-3173: - There is a race in the controller failover that can be causing this. Here is what we have in {{KafkaController.oncControllerFailover}}: {noformat} partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() replicaStateMachine.startup() partitionStateMachine.startup() {noformat} Both partition state machine and replica state machine are registering zk listeners before they startup. In partition state machine, the two calls that invoke {{ControllerBrokerRequestBatch.newBatch()}} are {{triggerOnlinePartitionStateChange}} and {{handleStateChanges}}. They both invoke a private method that check {{hasStarted}} and throw an exception if it hasn't started. The variable {{hasStarted}} is set to true upon executing {{startup}}. Consequently, if we register the listener before starting the state machine and an event comes through, then what will happen is that we will leave the batch dirty, which will cause the exception in the description. In both {{triggerOnlinePartitionStateChange}} and {{handleStateChanges}}, we simply log the error and move on leaving the batch dirty. Note that we have the same issue in replica state machine. I believe the right execution order in {{onControllerFailover}} should be: {noformat} partitionStateMachine.startup() replicaStateMachine.startup() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() {noformat} There could be other sources, but right now this looks like a clear one to me. Also, something that looks really bad in lots of place in the controller code is that if there is some error processing partition or replica changes, then the code simply logs and moves on. Instead, I believe it should either recover from the error or resigning as controller, but we can't simply skip an update. This should be addressed when rewriting the controller, though. As for this jira, I suggest we fix this race and revisit it in the case it reappears. > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira >Priority: Critical > Fix For: 0.10.0.0 > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. > Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lostat > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3305) JMX in AWS instance
Flavio Junqueira created KAFKA-3305: --- Summary: JMX in AWS instance Key: KAFKA-3305 URL: https://issues.apache.org/jira/browse/KAFKA-3305 Project: Kafka Issue Type: Improvement Environment: AWS Reporter: Flavio Junqueira While connecting JConsole to a broker running on AWS, I've needed to set the following properties to get it to work: {noformat} -Djava.net.preferIPv4Stack=true -Djava.rmi.server.hostname=xxx.compute.amazonaws.com -Dcom.sun.management.jmxremote.rmi.port=9996 {noformat} in addition to setting the JMX_PORT variable. I suggest we at least document these options and possibly have them in {{kafka-run-class.sh}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue
[ https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158807#comment-15158807 ] Flavio Junqueira commented on KAFKA-3042: - bq. I think this is because the broker consider itself as the leader in fact it's a follower. So after several failed tries, it need to find out who is the leader. But in this case, wouldn't the broker eventually recover? What I find problematic here is that the descriptions and comments in the related jiras mention the need of bouncing brokers, it shouldn't be necessary. If anyone has state change logs to share for the period in which one of these incidents occurred, it'd be nice to see them. > updateIsr should stop after failed several times due to zkVersion issue > --- > > Key: KAFKA-3042 > URL: https://issues.apache.org/jira/browse/KAFKA-3042 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.8.2.1 > Environment: jdk 1.7 > centos 6.4 >Reporter: Jiahongchao > > sometimes one broker may repeatly log > "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR" > I think this is because the broker consider itself as the leader in fact it's > a follower. > So after several failed tries, it need to find out who is the leader -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures
[ https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158799#comment-15158799 ] Flavio Junqueira commented on KAFKA-1382: - hi there, it isn't clear to me if you've hit this bug or if your broker simply got stale versions. even with the fix proposed here, you can still get conflicting versions, which will lead to those messages. was your cluster able to recover after you've seen those messages? what's the precise behavior you have observed? > Update zkVersion on partition state update failures > --- > > Key: KAFKA-1382 > URL: https://issues.apache.org/jira/browse/KAFKA-1382 > Project: Kafka > Issue Type: Bug >Reporter: Joel Koshy >Assignee: Sriharsha Chintalapani > Fix For: 0.8.1.2, 0.8.2.0 > > Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, > KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, > KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, > KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, > KAFKA-1382_2014-06-16_14:19:27.patch > > > Our updateIsr code is currently: > private def updateIsr(newIsr: Set[Replica]) { > debug("Updated ISR for partition [%s,%d] to %s".format(topic, > partitionId, newIsr.mkString(","))) > val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, > newIsr.map(r => r.brokerId).toList, zkVersion) > // use the epoch of the controller that made the leadership decision, > instead of the current controller epoch > val (updateSucceeded, newVersion) = > ZkUtils.conditionalUpdatePersistentPath(zkClient, > ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), > ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion) > if (updateSucceeded){ > inSyncReplicas = newIsr > zkVersion = newVersion > trace("ISR updated to [%s] and zkVersion updated to > [%d]".format(newIsr.mkString(","), zkVersion)) > } else { > info("Cached zkVersion [%d] not equal to that in zookeeper, skip > updating ISR".format(zkVersion)) > } > We encountered an interesting scenario recently when a large producer fully > saturated the broker's NIC for over an hour. The large volume of data led to > a number of ISR shrinks (and subsequent expands). The NIC saturation > affected the zookeeper client heartbeats and led to a session timeout. The > timeline was roughly as follows: > - Attempt to expand ISR > - Expansion written to zookeeper (confirmed in zookeeper transaction logs) > - Session timeout after around 13 seconds (the configured timeout is 20 > seconds) so that lines up. > - zkclient reconnects to zookeeper (with the same session ID) and retries > the write - but uses the old zkVersion. This fails because the zkVersion > has already been updated (above). > - The ISR expand keeps failing after that and the only way to get out of it > is to bounce the broker. > In the above code, if the zkVersion is different we should probably update > the cached version and even retry the expansion until it succeeds. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3167) Use local to the workspace Gradle cache and recreate it on every build
[ https://issues.apache.org/jira/browse/KAFKA-3167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142761#comment-15142761 ] Flavio Junqueira commented on KAFKA-3167: - we currently have this: {noformat} $GRADLE_2_4_RC_2_HOME/bin/gradle {noformat} should I replace it with: {noformat} export GRADLE_USER_HOME=${WORKSPACE}/${JOB_NAME}/gradle $GRADLE_USER_HOME/bin/gradle {noformat} > Use local to the workspace Gradle cache and recreate it on every build > -- > > Key: KAFKA-3167 > URL: https://issues.apache.org/jira/browse/KAFKA-3167 > Project: Kafka > Issue Type: Improvement > Components: build >Reporter: Ismael Juma > > Kafka builds often fail with "Could not add entry > '/home/jenkins/.gradle/caches/modules-2/files-2.1/net.jpountz.lz4/lz4/1.3.0/c708bb2590c0652a642236ef45d9f99ff842a2ce/lz4-1.3.0.jar' > to cache fileHashes.bin" > I filed INFRA-11083 and Andrew Bayer suggested: > "Can you change your builds to use a local-to-the-workspace cache and then > nuke it/recreate it on every build?" > This issue is about changing the Jenkins config for one of the trunk builds > to do the above to see if it helps. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils
[ https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3210: Description: We have observed a number of issues with the controller interaction with ZooKeeper mainly because ZkClient creates new sessions transparently under the hood. Creating sessions transparently enables, for example, old controller to successfully update znodes in ZooKeeper even when they aren't the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass the ZkClient lib like we did with ZKWatchedEphemeral. In addition to fixing such races with the controller, it would improve performance significantly if we used the async API (see KAFKA-3038). The async API is more efficient because it pipelines the requests to ZooKeeper, and the number of requests upon controller recovery can be large. This jira proposes to make these two changes to the calls in ZkUtils and to do it, one path is to first replace the calls in ZkUtils with raw async ZK calls and block so that we don't have to change the controller code in this phase. Once this step is accomplished and it is stable, we make changes to the controller to handle the asynchronous calls to ZooKeeper. Note that in the first step, we will need to introduce some new logic for session management, which is currently handled entirely by ZkClient. We will also need to implement the subscription mechanism for event notifications (see ZooKeeperLeaderElector as a an exemple). was: We have observed a number of issues with the controller interaction with ZooKeeper mainly because ZkClient creates new sessions transparently under the hood. Creating sessions transparently enables, for example, old controller to successfully update znodes in ZooKeeper even when they aren't the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass the ZkClient lib like we did with ZKWatchedEphemeral. In addition to fixing such races with the controller, it would improve performance significantly if we used the async API (see KAFKA-3038). The async API is more efficient because it pipelines the requests to ZooKeeper, and the number of requests upon controller recovery can be large. This jira proposes to make these two changes to the calls in ZkUtils and to do it, one path is to first replace the calls in ZkUtils with raw async ZK calls and block so that we don't have to change the controller code in this phase. Once this step is accomplished and it is stable, we make changes to the controller to handle the asynchronous calls to ZooKeeper. Note that in the first step, we will need to introduce some logic for session management, which is currently handled entirely by ZkClient. > Using asynchronous calls through the raw ZK API in ZkUtils > -- > > Key: KAFKA-3210 > URL: https://issues.apache.org/jira/browse/KAFKA-3210 > Project: Kafka > Issue Type: Improvement > Components: controller, zkclient >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira > > We have observed a number of issues with the controller interaction with > ZooKeeper mainly because ZkClient creates new sessions transparently under > the hood. Creating sessions transparently enables, for example, old > controller to successfully update znodes in ZooKeeper even when they aren't > the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass > the ZkClient lib like we did with ZKWatchedEphemeral. > In addition to fixing such races with the controller, it would improve > performance significantly if we used the async API (see KAFKA-3038). The > async API is more efficient because it pipelines the requests to ZooKeeper, > and the number of requests upon controller recovery can be large. > This jira proposes to make these two changes to the calls in ZkUtils and to > do it, one path is to first replace the calls in ZkUtils with raw async ZK > calls and block so that we don't have to change the controller code in this > phase. Once this step is accomplished and it is stable, we make changes to > the controller to handle the asynchronous calls to ZooKeeper. > Note that in the first step, we will need to introduce some new logic for > session management, which is currently handled entirely by ZkClient. We will > also need to implement the subscription mechanism for event notifications > (see ZooKeeperLeaderElector as a an exemple). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2818) Clean up Controller Object on forced Resignation
[ https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-2818: Assignee: Flavio Junqueira (was: Neha Narkhede) Status: Open (was: Patch Available) > Clean up Controller Object on forced Resignation > > > Key: KAFKA-2818 > URL: https://issues.apache.org/jira/browse/KAFKA-2818 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0 >Reporter: Matthew Bruce >Assignee: Flavio Junqueira >Priority: Minor > Attachments: KAFKA-2818.patch > > > Currently if the controller does a forced resignation (if an exception is > caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or > shutdownBroker), the Zookeeper resignation callback function > OnControllerResignation doesn't get a chance to execute which leaves some > artifacts laying around. In particular the Sensors dont get cleaned up and > if the Kafka broker ever gets re-elected as Controller it will fail due to > some metrics already existing. An Error and stack trace of such an event is > below. > A forced resignation situation can be induced with a mis-config in > broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and > setting inter.broker.protocol.version=0.8.2.X > {code} > listeners=SASL_PLAINTEXT://:9092 > inter.broker.protocol.version=0.8.2.X > security.inter.broker.protocol=SASL_PLAINTEXT > {code} > {code} > [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on > broker 182050300 (kafka.server.ZookeeperLeaderElector) > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, group=controller-channel-metrics, > description=Connections closed per second in the window., > tags={broker-id=182050300}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:578) > at org.apache.kafka.common.network.Selector.(Selector.java:112) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:43) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2818) Clean up Controller Object on forced Resignation
[ https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136147#comment-15136147 ] Flavio Junqueira commented on KAFKA-2818: - [~mbr...@blackberry.com] You're right that {{onControllerResignation()}} isn't being called, but the problem is that {{ZooKeeperLeaderElector.handleDataDeleted()}} should be invoking it via {{onResigningAsLeader()}} and it isn't because {{amILeader}} is evaluating to false. It is evaluating to false because the call to {{resign()}} is setting {{leaderId}} to -1. I'm thinking that we shouldn't set {{leaderId}} to -1 in the {{resign()}} call. {{leaderId}} to will be set to -1 if {{getControllerID}} returns -1 when called inside {{elect}}. What do you think? > Clean up Controller Object on forced Resignation > > > Key: KAFKA-2818 > URL: https://issues.apache.org/jira/browse/KAFKA-2818 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.9.0.0 >Reporter: Matthew Bruce >Assignee: Neha Narkhede >Priority: Minor > Attachments: KAFKA-2818.patch > > > Currently if the controller does a forced resignation (if an exception is > caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or > shutdownBroker), the Zookeeper resignation callback function > OnControllerResignation doesn't get a chance to execute which leaves some > artifacts laying around. In particular the Sensors dont get cleaned up and > if the Kafka broker ever gets re-elected as Controller it will fail due to > some metrics already existing. An Error and stack trace of such an event is > below. > A forced resignation situation can be induced with a mis-config in > broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and > setting inter.broker.protocol.version=0.8.2.X > {code} > listeners=SASL_PLAINTEXT://:9092 > inter.broker.protocol.version=0.8.2.X > security.inter.broker.protocol=SASL_PLAINTEXT > {code} > {code} > [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on > broker 182050300 (kafka.server.ZookeeperLeaderElector) > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, group=controller-channel-metrics, > description=Connections closed per second in the window., > tags={broker-id=182050300}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:578) > at org.apache.kafka.common.network.Selector.(Selector.java:112) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:43) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3197) Producer can send message out of order even when in flight request is set to 1.
[ https://issues.apache.org/jira/browse/KAFKA-3197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133265#comment-15133265 ] Flavio Junqueira commented on KAFKA-3197: - [~becket_qin] thanks for the clarification. bq. If leader moves, that does not necessarily mean the request to old leader failed. We can always send the unacknowledged message to new leader, but that probably introduce duplicate almost every time leader moves. I agree that duplicates are inconvenient, but in this scenario we aren't promising no duplicates, so I'd rather treat the duplicates separately. bq. Currently after batches leave the record accumulator, we only track them in requests. The record accumulator point is a good one and I'm not super familiar with that part of the code, so I don't have any concrete suggestion right now, but I'll have a closer look. However, bq. So while the idea of resend unacknowledged message to both old and new leader is natural and makes sense, it seems much more complicated and error prone based on our current implementation and does not buy us much. True, from your description, it sounds like the change isn't trivial. But let me ask you this: don't we ever have to retransmit messages after a leader change? If we do, then the code path for retransmitting on a different connection must be there. I'm not super familiar with that part of the code, so I don't have any concrete suggestion right now, but I can have a look to see if I'm able to help out. > Producer can send message out of order even when in flight request is set to > 1. > --- > > Key: KAFKA-3197 > URL: https://issues.apache.org/jira/browse/KAFKA-3197 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin > Fix For: 0.9.0.1 > > > The issue we saw is following: > 1. Producer send message 0 to topic-partition-0 on broker A. The in-flight > request to broker A is 1. > 2. The request is somehow lost > 3. Producer refreshed its topic metadata and found leader of > topic-partition-0 migrated from broker A to broker B. > 4. Because there is no in-flight request to broker B. All the subsequent > messages to topic-partition-0 in the record accumulator are sent to broker B. > 5. Later on when the request in step (1) times out, message 0 will be retried > and sent to broker B. At this point, all the later messages has already been > sent, so we have re-order. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3197) Producer can send message out of order even when in flight request is set to 1.
[ https://issues.apache.org/jira/browse/KAFKA-3197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15132571#comment-15132571 ] Flavio Junqueira commented on KAFKA-3197: - Treating it as a bug sounds right. In the example given in the description, when the producer connects to broker B, shouldn't it resend unacknowledged messages (0 in the example) over the new connection (to broker B in the example)? It can produce duplicates as has been pointed out, but eliminating duplicates is a separate matter. > Producer can send message out of order even when in flight request is set to > 1. > --- > > Key: KAFKA-3197 > URL: https://issues.apache.org/jira/browse/KAFKA-3197 > Project: Kafka > Issue Type: Bug > Components: clients, producer >Affects Versions: 0.9.0.0 >Reporter: Jiangjie Qin >Assignee: Jiangjie Qin > Fix For: 0.9.0.1 > > > The issue we saw is following: > 1. Producer send message 0 to topic-partition-0 on broker A. The in-flight > request to broker A is 1. > 2. The request is somehow lost > 3. Producer refreshed its topic metadata and found leader of > topic-partition-0 migrated from broker A to broker B. > 4. Because there is no in-flight request to broker B. All the subsequent > messages to topic-partition-0 in the record accumulator are sent to broker B. > 5. Later on when the request in step (1) times out, message 0 will be retried > and sent to broker B. At this point, all the later messages has already been > sent, so we have re-order. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils
Flavio Junqueira created KAFKA-3210: --- Summary: Using asynchronous calls through the raw ZK API in ZkUtils Key: KAFKA-3210 URL: https://issues.apache.org/jira/browse/KAFKA-3210 Project: Kafka Issue Type: Improvement Components: controller, zkclient Affects Versions: 0.9.0.0 Reporter: Flavio Junqueira Assignee: Neha Narkhede We have observed a number of issues with the controller interaction with ZooKeeper mainly because ZkClient creates new sessions transparently under the hood. Creating sessions transparently enables, for example, old controller to successfully update znodes in ZooKeeper even when they aren't the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass the ZkClient lib like we did with ZKWatchedEphemeral. In addition to fixing such races with the controller, it would improve performance significantly if we used the async API (see KAFKA-3038). The async API is more efficient because it pipelines the requests to ZooKeeper, and the number of requests upon controller recovery can be large. This jira proposes to make these two changes to the calls in ZkUtils and to do it, one path is to first replace the calls in ZkUtils with raw async ZK calls and block so that we don't have to change the controller code in this phase. Once this step is accomplished and it is stable, we make changes to the controller to handle the asynchronous calls to ZooKeeper. Note that in the first step, we will need to introduce some logic for session management, which is currently handled entirely by ZkClient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils
[ https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3210: Assignee: (was: Neha Narkhede) > Using asynchronous calls through the raw ZK API in ZkUtils > -- > > Key: KAFKA-3210 > URL: https://issues.apache.org/jira/browse/KAFKA-3210 > Project: Kafka > Issue Type: Improvement > Components: controller, zkclient >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira > > We have observed a number of issues with the controller interaction with > ZooKeeper mainly because ZkClient creates new sessions transparently under > the hood. Creating sessions transparently enables, for example, old > controller to successfully update znodes in ZooKeeper even when they aren't > the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass > the ZkClient lib like we did with ZKWatchedEphemeral. > In addition to fixing such races with the controller, it would improve > performance significantly if we used the async API (see KAFKA-3038). The > async API is more efficient because it pipelines the requests to ZooKeeper, > and the number of requests upon controller recovery can be large. > This jira proposes to make these two changes to the calls in ZkUtils and to > do it, one path is to first replace the calls in ZkUtils with raw async ZK > calls and block so that we don't have to change the controller code in this > phase. Once this step is accomplished and it is stable, we make changes to > the controller to handle the asynchronous calls to ZooKeeper. > Note that in the first step, we will need to introduce some logic for session > management, which is currently handled entirely by ZkClient. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
Flavio Junqueira created KAFKA-3173: --- Summary: Error while moving some partitions to OnlinePartition state Key: KAFKA-3173 URL: https://issues.apache.org/jira/browse/KAFKA-3173 Project: Kafka Issue Type: Bug Affects Versions: 0.9.0.0 Reporter: Flavio Junqueira Priority: Critical Fix For: 0.9.0.1 We observed another instance of the problem reported in KAFKA-2300, but this time the error appeared in the partition state machine. In KAFKA-2300, we haven't cleaned up the state in {{PartitionStateMachine}} and {{ReplicaStateMachine}} as we do in {{KafkaController}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3173: Description: We observed another instance of the problem reported in KAFKA-2300, but this time the error appeared in the partition state machine. In KAFKA-2300, we haven't cleaned up the state in {{PartitionStateMachine}} and {{ReplicaStateMachine}} as we do in {{KafkaController}}. Here is the stack trace: {noformat} 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: Error while moving some partitions to OnlinePartition state (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) might be lostat kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) {noformat} was:We observed another instance of the problem reported in KAFKA-2300, but this time the error appeared in the partition state machine. In KAFKA-2300, we haven't cleaned up the state in {{PartitionStateMachine}} and {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Priority: Critical > Fix For: 0.9.0.1 > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> > (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lostat > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state
[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3173: Description: We observed another instance of the problem reported in KAFKA-2300, but this time the error appeared in the partition state machine. In KAFKA-2300, we haven't cleaned up the state in {{PartitionStateMachine}} and {{ReplicaStateMachine}} as we do in {{KafkaController}}. Here is the stack trace: {noformat} 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: Error while moving some partitions to OnlinePartition state (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) might be lostat kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) {noformat} was: We observed another instance of the problem reported in KAFKA-2300, but this time the error appeared in the partition state machine. In KAFKA-2300, we haven't cleaned up the state in {{PartitionStateMachine}} and {{ReplicaStateMachine}} as we do in {{KafkaController}}. Here is the stack trace: {noformat} 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: Error while moving some partitions to OnlinePartition state (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: Controller to broker state change requests batch is not empty while creating a new one. Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) might be lostat kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) at kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) at kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) at kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) at kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) {noformat} > Error while moving some partitions to OnlinePartition state > > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Priority: Critical > Fix For: 0.9.0.1 > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state >
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110680#comment-15110680 ] Flavio Junqueira commented on KAFKA-3083: - bq. 1) We need to use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated. I was really just thinking out loud, the multiop is just a hack to get around the fact that controller broker doesn't know if the underlying session has been recreated or not. The comment about using multiop was simply pointing that you can check and update atomically with this multiop recipe. If we do this the right way, then we don't need to use a multiop call. bq. 2) The race condition that Jun Rao mentioned still exist above in 1). It still exists but the multiop would fail to perform the update on ZK if you're checking a version. bq. 4) To do step 3), as Jun Rao suggested we have to detect the connection loss event. There are two parts. Detecting connection loss is one of them. If the controller isn't sure about its session when it receives connection loss, then it should stop. The second part is not to create a new session if the previous one expired. If the session of A has expired, which must happen by step 2) otherwise B can't be elected, then A isn't able to get requests completed on the expired session. Once B is elected, the session of A must have expired and no update coming from A will be executed. Of course, we want to bring broker A back up and to do it, we need to start a new session. However, before starting a new session, we need to make sure to stop any controller work in A. bq. i) Broker A has connection loss and connects immediately in which case it gets a SyncConnected event. Now the session MIGHT NOT have expired since the connection happened immediately. Broker A is expected to continue since it is still the controller and the session has not expired. ii) Broker A has connection loss and connects back in which case it gets a SyncConnected event. Now the session MIGHT have expired. Broker A is expected to stop all the zk operations. The broker will only get SyncConnected if it connects and it is able to validate the session. If the session is invalid, then it gets an Expired notification. Note that if we are using SASL to authenticate, then we could be also getting an authenticated event. > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110748#comment-15110748 ] Flavio Junqueira commented on KAFKA-3083: - Sure, we need to transform all operations to look like what we currently have in ZKCheckedEphemeral. That particular class is a bit special because it performs checks and such, but essentially we need to change the current calls in ZkUtils to use asynchronous calls using the ZK handle directly and have a callback class that pairs up with the call. Related to this present issue, we will also need to implement session management, but this time it can't try to be transparent like ZkClient does. It is good to have a central point to get the current zk handle from, but we need to give the broker the ability to signal when to create a new session. As part of this signaling, we will need to implement some kind of listener to propagate events. Another option is to let the broker implement directly a Watcher to process event notifications. One simple way to start is to replace gradually the calls in ZkUtils with asynchronous calls, still using the handle ZkUtils provide. The calls would block to maintain the current behavior outside ZkUtils. Once that's done, we can make the calls non-blocking and do the necessary changes across broker/controller. Finally, we can replace the session management with our own last. If you guys want to do this, then we should probably create an umbrella jira. > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3128) Add metrics for ZooKeeper events
[ https://issues.apache.org/jira/browse/KAFKA-3128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3128: Summary: Add metrics for ZooKeeper events (was: Add metric for ZooKeeper events) > Add metrics for ZooKeeper events > > > Key: KAFKA-3128 > URL: https://issues.apache.org/jira/browse/KAFKA-3128 > Project: Kafka > Issue Type: Improvement > Components: core, zkclient >Reporter: Flavio Junqueira > Fix For: 0.9.1.0 > > > It would be useful to report via Kafka metrics the number of ZK event > notifications, such as connection loss events, session expiration events, > etc., as a way of spotting potential issues with the communication with the > ZK ensemble. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3128) Add metric for ZooKeeper events
Flavio Junqueira created KAFKA-3128: --- Summary: Add metric for ZooKeeper events Key: KAFKA-3128 URL: https://issues.apache.org/jira/browse/KAFKA-3128 Project: Kafka Issue Type: Improvement Components: core, zkclient Reporter: Flavio Junqueira Fix For: 0.9.1.0 It would be useful to report via Kafka metrics the number of ZK event notifications, such as connection loss events, session expiration events, etc., as a way of spotting potential issues with the communication with the ZK ensemble. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-3083: Summary: a soft failure in controller may leave a topic partition in an inconsistent state (was: a soft failure in controller may leader a topic partition in an inconsistent state) > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105587#comment-15105587 ] Flavio Junqueira commented on KAFKA-3083: - [~junrao] bq. does controller A just resume from where it's left off? Or does it ignore all outstanding events and re-read all subscribed ZK paths (since there could be missing events between the connection loss event and the SyncConnected event)? I don't see a reason for ignoring outstanding events and re-reading zk state. If the session hasn't expired, then the broker is still the controller and I'd say it is safe to assume the no other controller work happened in parallel. bq. ZkClient actually hides the ZK ConnectionLoss event and only informs the application when the ZK session expires. To pursue this, we will have to access ZK directly. I think further down you noted that ZkClient actually exposes the connection loss event, but does put a thread in the middle. > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105597#comment-15105597 ] Flavio Junqueira commented on KAFKA-3083: - [~junrao] bq. Even if it exists, it has the same problem--the session can expire after the check. So, you pretty much have to rely on the raw ZK client api that handles ConnectionLossException while reading/writing to ZK. Perhaps you meant to say this, but you can also learn about a connection loss via the watcher you pass when creating a zookeeper object. > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105612#comment-15105612 ] Flavio Junqueira commented on KAFKA-3083: - [~mgharat] bq. I was just thinking if we can modify the controller code to always check if it is the controller before it makes such changes to zookeeper. In principle, there is the race that [~junrao] mentioned, but I was thinking that one possibility would be use a multi-op that combines the update to the ISR and a znode check. The znode check verifies that the version of the controller leadership znode is still the same and if it passes, then the ISR data is updated. Using the scenario in the description to illustrate, when broker A tries to update the ISR state in ZK in step 3, the operation fails because the version of the controller leadership znode has changed. The solution of handling the connection loss event is typical, but we could consider adding a multi-op to be extra safe against these spurious writes. > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105762#comment-15105762 ] Flavio Junqueira commented on KAFKA-3083: - Another thing to mention is that if ZkClient didn't create a new session transparently, then the update of broker A in step 3 would fail because the session has expired and the ZK ensemble wouldn't take a request from an expired session. > a soft failure in controller may leave a topic partition in an inconsistent > state > - > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15101993#comment-15101993 ] Flavio Junqueira commented on KAFKA-3083: - Hey [~mgharat], Best practice with ZK is to put the master (controller in this case) on hold upon a connection loss event and wait until the next event, which can be flagging a reconnection or that the session has expired. It should call {{controllerResignation}} upon a session expiration, and resume if it reconnects. But, we have to be careful because we can't really control the speed of messages, and even if A stops before B takes over in your example, we can't guarantee that some message from A will hit some broker somewhere late. The description of this jira says that C correctly discards an old message, and it should be like that, so this part looks fine this far. It is about the change in ZK happening at the wrong time. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3107) Error when trying to shut down auto balancing scheduler of controller
Flavio Junqueira created KAFKA-3107: --- Summary: Error when trying to shut down auto balancing scheduler of controller Key: KAFKA-3107 URL: https://issues.apache.org/jira/browse/KAFKA-3107 Project: Kafka Issue Type: Bug Affects Versions: 0.8.2.1 Reporter: Flavio Junqueira We observed the following exception when a controller was shutting down: {noformat} [run] Error handling event ZkEvent[New session event sent to kafka.controller.KafkaController$SessionExpirationListener@3278c211] java.lang.IllegalStateException: Kafka scheduler has not been started at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107) at kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107) at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) {noformat} The scheduler should have been started. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095920#comment-15095920 ] Flavio Junqueira commented on KAFKA-3083: - @mayuresh the fact that A kept going with a session expired makes me think that A ignored the connection loss event and kept doing controller work. What we recommend for mastership with ZooKeeper is that the master stops doing master work upon receiving a connection loss event, and either resumes if it reconnects or drops mastership altogether if the session expires. Talking to [~junrao] about this, it sounds like the controller isn't processing the event that ZkClient is passing up. Let me give you some more context on session semantics. At 2/3 of the session expiration, if the client hasn't heard from the current server it is connected to, then it will start looking for another server and will notify the application via connection loss events. At that point, the recommendation is that the client (broker in this case) stops doing any master work until it learns more about the session. I also need to add that I haven't verified this in the code, so it is possible that it is something else causing the problem, but it sounds wrong that a controller with a session expired keeps going. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095920#comment-15095920 ] Flavio Junqueira edited comment on KAFKA-3083 at 1/13/16 10:04 AM: --- [~mgharat] the fact that A kept going with a session expired makes me think that A ignored the connection loss event and kept doing controller work. What we recommend for mastership with ZooKeeper is that the master stops doing master work upon receiving a connection loss event, and either resumes if it reconnects or drops mastership altogether if the session expires. Talking to [~junrao] about this, it sounds like the controller isn't processing the event that ZkClient is passing up. Let me give you some more context on session semantics. At 2/3 of the session expiration, if the client hasn't heard from the current server it is connected to, then it will start looking for another server and will notify the application via connection loss events. At that point, the recommendation is that the client (broker in this case) stops doing any master work until it learns more about the session. I also need to add that I haven't verified this in the code, so it is possible that it is something else causing the problem, but it sounds wrong that a controller with a session expired keeps going. was (Author: fpj): @mayuresh the fact that A kept going with a session expired makes me think that A ignored the connection loss event and kept doing controller work. What we recommend for mastership with ZooKeeper is that the master stops doing master work upon receiving a connection loss event, and either resumes if it reconnects or drops mastership altogether if the session expires. Talking to [~junrao] about this, it sounds like the controller isn't processing the event that ZkClient is passing up. Let me give you some more context on session semantics. At 2/3 of the session expiration, if the client hasn't heard from the current server it is connected to, then it will start looking for another server and will notify the application via connection loss events. At that point, the recommendation is that the client (broker in this case) stops doing any master work until it learns more about the session. I also need to add that I haven't verified this in the code, so it is possible that it is something else causing the problem, but it sounds wrong that a controller with a session expired keeps going. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15097302#comment-15097302 ] Flavio Junqueira commented on KAFKA-3083: - It sounds like this comment from [~junrao] is extending the description of the jira. It assumes that the replica that was removed from the ISR in step 1 eventually came back, but it coming back isn't reflected in the state of ZK. However, the replica would be in the cache of the controller B, so would it be elected in this case? Would it be an actual problem if the B is demoted and another controller comes up? > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state
[ https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15097330#comment-15097330 ] Flavio Junqueira commented on KAFKA-3083: - One clarification, if broker A shrinks the partition ISR in ZK before step 2 but notifies C after step 2 as in the description, does C eventually learn the ISR stored in ZK? If it doesn't, then the observation about connection loss might not be sufficient to fix it or even matter at all in this case. > a soft failure in controller may leader a topic partition in an inconsistent > state > -- > > Key: KAFKA-3083 > URL: https://issues.apache.org/jira/browse/KAFKA-3083 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 0.9.0.0 >Reporter: Jun Rao >Assignee: Mayuresh Gharat > > The following sequence can happen. > 1. Broker A is the controller and is in the middle of processing a broker > change event. As part of this process, let's say it's about to shrink the isr > of a partition. > 2. Then broker A's session expires and broker B takes over as the new > controller. Broker B sends the initial leaderAndIsr request to all brokers. > 3. Broker A continues by shrinking the isr of the partition in ZK and sends > the new leaderAndIsr request to the broker (say C) that leads the partition. > Broker C will reject this leaderAndIsr since the request comes from a > controller with an older epoch. Now we could be in a situation that Broker C > thinks the isr has all replicas, but the isr stored in ZK is different. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-2260: Patch is stale, I'm uploading an updated version. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2260) Allow specifying expected offset on produce
[ https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-2260: Attachment: KAFKA-2260.patch I understand we are doing patchers through github push requests, but here I'm just uploading an updated version of what was already here. > Allow specifying expected offset on produce > --- > > Key: KAFKA-2260 > URL: https://issues.apache.org/jira/browse/KAFKA-2260 > Project: Kafka > Issue Type: Improvement >Reporter: Ben Kirwin >Assignee: Ewen Cheslack-Postava >Priority: Minor > Attachments: KAFKA-2260.patch, expected-offsets.patch > > > I'd like to propose a change that adds a simple CAS-like mechanism to the > Kafka producer. This update has a small footprint, but enables a bunch of > interesting uses in stream processing or as a commit log for process state. > h4. Proposed Change > In short: > - Allow the user to attach a specific offset to each message produced. > - The server assigns offsets to messages in the usual way. However, if the > expected offset doesn't match the actual offset, the server should fail the > produce request instead of completing the write. > This is a form of optimistic concurrency control, like the ubiquitous > check-and-set -- but instead of checking the current value of some state, it > checks the current offset of the log. > h4. Motivation > Much like check-and-set, this feature is only useful when there's very low > contention. Happily, when Kafka is used as a commit log or as a > stream-processing transport, it's common to have just one producer (or a > small number) for a given partition -- and in many of these cases, predicting > offsets turns out to be quite useful. > - We get the same benefits as the 'idempotent producer' proposal: a producer > can retry a write indefinitely and be sure that at most one of those attempts > will succeed; and if two producers accidentally write to the end of the > partition at once, we can be certain that at least one of them will fail. > - It's possible to 'bulk load' Kafka this way -- you can write a list of n > messages consecutively to a partition, even if the list is much larger than > the buffer size or the producer has to be restarted. > - If a process is using Kafka as a commit log -- reading from a partition to > bootstrap, then writing any updates to that same partition -- it can be sure > that it's seen all of the messages in that partition at the moment it does > its first (successful) write. > There's a bunch of other similar use-cases here, but they all have roughly > the same flavour. > h4. Implementation > The major advantage of this proposal over other suggested transaction / > idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a > currently-unused field, adds no new APIs, and requires very little new code > or additional work from the server. > - Produced messages already carry an offset field, which is currently ignored > by the server. This field could be used for the 'expected offset', with a > sigil value for the current behaviour. (-1 is a natural choice, since it's > already used to mean 'next available offset'.) > - We'd need a new error and error code for a 'CAS failure'. > - The server assigns offsets to produced messages in > {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this > changed, this method would assign offsets in the same way -- but if they > don't match the offset in the message, we'd return an error instead of > completing the write. > - To avoid breaking existing clients, this behaviour would need to live > behind some config flag. (Possibly global, but probably more useful > per-topic?) > I understand all this is unsolicited and possibly strange: happy to answer > questions, and if this seems interesting, I'd be glad to flesh this out into > a full KIP or patch. (And apologies if this is the wrong venue for this sort > of thing!) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-3038) Speeding up partition reassignment after broker failure
[ https://issues.apache.org/jira/browse/KAFKA-3038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15085300#comment-15085300 ] Flavio Junqueira commented on KAFKA-3038: - You don't really need to batch with multi, you just need to make the calls asynchronous. In fact, unless you really need to make multiple updates transactional, the preferred way is to push updates asynchronously to keep the pipeline full. > Speeding up partition reassignment after broker failure > --- > > Key: KAFKA-3038 > URL: https://issues.apache.org/jira/browse/KAFKA-3038 > Project: Kafka > Issue Type: Improvement > Components: controller, core >Affects Versions: 0.9.0.0 >Reporter: Eno Thereska >Assignee: Eno Thereska > Fix For: 0.9.0.0 > > > After a broker failure the controller does several writes to Zookeeper for > each partition on the failed broker. Writes are done one at a time, in closed > loop, which is slow especially under high latency networks. Zookeeper has > support for batching operations (the "multi" API). It is expected that > substituting serial writes with batched ones should reduce failure handling > time by an order of magnitude. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-3069) Fix recursion in ZkSecurityMigrator
Flavio Junqueira created KAFKA-3069: --- Summary: Fix recursion in ZkSecurityMigrator Key: KAFKA-3069 URL: https://issues.apache.org/jira/browse/KAFKA-3069 Project: Kafka Issue Type: Bug Components: security Affects Versions: 0.9.0.0 Reporter: Flavio Junqueira Fix For: 0.9.0.1 The zk migrator tool recursively sets ACLs starting with the root, which we initially assumed was either the root of a dedicated ensemble or a chroot. However, there are at least two reasons for not doing it this way. First, shared ensembles might not really follow the practice of separating applications into branches, essentially creating a chroot for each. Second, there are paths we don't want to secure, like the ConsumersPath. To fix this, we simply need to set the root ACL separately and start the recursion on each of the persistent paths to secure. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-3069) Fix recursion in ZkSecurityMigrator
[ https://issues.apache.org/jira/browse/KAFKA-3069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira reassigned KAFKA-3069: --- Assignee: Flavio Junqueira > Fix recursion in ZkSecurityMigrator > --- > > Key: KAFKA-3069 > URL: https://issues.apache.org/jira/browse/KAFKA-3069 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira > Fix For: 0.9.0.1 > > > The zk migrator tool recursively sets ACLs starting with the root, which we > initially assumed was either the root of a dedicated ensemble or a chroot. > However, there are at least two reasons for not doing it this way. First, > shared ensembles might not really follow the practice of separating > applications into branches, essentially creating a chroot for each. Second, > there are paths we don't want to secure, like the ConsumersPath. > To fix this, we simply need to set the root ACL separately and start the > recursion on each of the persistent paths to secure. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2997) Synchronous write to disk
[ https://issues.apache.org/jira/browse/KAFKA-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15060260#comment-15060260 ] Flavio Junqueira commented on KAFKA-2997: - [~afirus] I'm not very sure of what you're trying to achieve here. It sounds like you don't want to count unflushed messages and instead you want to have a timer thread to trigger the flush. I'm wondering if you really need to have that thread to achieve your goal or if you can simply run a thread in a tight loop that will flush, accumulate while flushing, flush, accumulate while flushing, and so on. What I'm proposing is essentially what we do in the SyncRequestProcessor of ZooKeeper: https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java > Synchronous write to disk > - > > Key: KAFKA-2997 > URL: https://issues.apache.org/jira/browse/KAFKA-2997 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.9.0.0 >Reporter: Arkadiusz Firus >Priority: Minor > Labels: features, patch > > Hi All, > I am currently work on a mechanism which allows to do an efficient > synchronous writing to the file system. My idea is to gather few write > requests for one partition and after that call the fsync. > As I read the code I find out that the best place to do it is to modify: > kafka.log.Log.append > method. Currently at the end of the method (line 368) there is a verification > if the number of unflushed messages is greater than the flush interval > (configuration parameter). > I am thinking of extending this condition. I want to add additional boolean > configuration parameter (sync write or something like this). If this > parameter is set to true at the end of this method the thread should hang on > a lock. On the other hand there will be another timer thread (for every > partition) which will be invoked every 10ms (configuration parameter). During > invocation the thread will call flush method and after that will be releasing > all hanged threads. > I am writing here because I would like to know your opinion about such > approach. Do you think this one is good or maybe someone have a better (more > permanent) one. I would also like to know if such approach is according to > general Kafka architecture. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2979) Enable authorizer and ACLs in ducktape tests
Flavio Junqueira created KAFKA-2979: --- Summary: Enable authorizer and ACLs in ducktape tests Key: KAFKA-2979 URL: https://issues.apache.org/jira/browse/KAFKA-2979 Project: Kafka Issue Type: Test Components: system tests Affects Versions: 0.9.0.0 Reporter: Flavio Junqueira Assignee: Flavio Junqueira Fix For: 0.9.1.0 Add some support to test ACLs with ducktape tests and enable some test cases to use it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2951) Add test cases to EndToEndAuthorizationTest
Flavio Junqueira created KAFKA-2951: --- Summary: Add test cases to EndToEndAuthorizationTest Key: KAFKA-2951 URL: https://issues.apache.org/jira/browse/KAFKA-2951 Project: Kafka Issue Type: Test Affects Versions: 0.9.0.0 Reporter: Flavio Junqueira Assignee: Flavio Junqueira Fix For: 0.9.1.0 There are a few test cases that are worth adding. I've run them manually, but it sounds like a good idea to have them in: # Test incorrect topic name (authorization failure) # Test topic wildcard -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-2952: Affects Version/s: 0.9.0.0 > Add ducktape test for secure->unsecure ZK migration > > > Key: KAFKA-2952 > URL: https://issues.apache.org/jira/browse/KAFKA-2952 > Project: Kafka > Issue Type: Test >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira > Fix For: 0.9.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-2952: Fix Version/s: 0.9.1.0 > Add ducktape test for secure->unsecure ZK migration > > > Key: KAFKA-2952 > URL: https://issues.apache.org/jira/browse/KAFKA-2952 > Project: Kafka > Issue Type: Test >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira > Fix For: 0.9.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration
Flavio Junqueira created KAFKA-2952: --- Summary: Add ducktape test for secure->unsecure ZK migration Key: KAFKA-2952 URL: https://issues.apache.org/jira/browse/KAFKA-2952 Project: Kafka Issue Type: Test Reporter: Flavio Junqueira -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Assigned] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira reassigned KAFKA-2952: --- Assignee: Flavio Junqueira > Add ducktape test for secure->unsecure ZK migration > > > Key: KAFKA-2952 > URL: https://issues.apache.org/jira/browse/KAFKA-2952 > Project: Kafka > Issue Type: Test >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira > Fix For: 0.9.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration
[ https://issues.apache.org/jira/browse/KAFKA-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Flavio Junqueira updated KAFKA-2952: Description: We have test cases for the unsecure -> secure path, but not the other way around, We should add it. > Add ducktape test for secure->unsecure ZK migration > > > Key: KAFKA-2952 > URL: https://issues.apache.org/jira/browse/KAFKA-2952 > Project: Kafka > Issue Type: Test >Affects Versions: 0.9.0.0 >Reporter: Flavio Junqueira >Assignee: Flavio Junqueira > Fix For: 0.9.1.0 > > > We have test cases for the unsecure -> secure path, but not the other way > around, We should add it. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2910) Failure in kafka.api.SslEndToEndAuthorizationTest.testNoGroupAcl
[ https://issues.apache.org/jira/browse/KAFKA-2910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15044814#comment-15044814 ] Flavio Junqueira commented on KAFKA-2910: - Based on this WARN: {noformat} [2015-11-30 01:35:03,481] WARN SASL configuration failed: javax.security.auth.login.LoginException: No JAAS configuration section named 'Client' was found in specified JAAS configuration file: '/tmp/jaas6536686531650477656.conf'. Will continue connection to Zookeeper server without SASL authentication, if Zookeeper server allows it. (org.apache.zookeeper.ClientCnxn:957) {noformat} It looks like another instance of the the configuration reset problem. > Failure in kafka.api.SslEndToEndAuthorizationTest.testNoGroupAcl > > > Key: KAFKA-2910 > URL: https://issues.apache.org/jira/browse/KAFKA-2910 > Project: Kafka > Issue Type: Sub-task >Reporter: Guozhang Wang > Fix For: 0.9.1.0 > > > {code} > java.lang.SecurityException: zkEnableSecureAcls is true, but the verification > of the JAAS login file failed. > at kafka.server.KafkaServer.initZk(KafkaServer.scala:265) > at kafka.server.KafkaServer.startup(KafkaServer.scala:168) > at kafka.utils.TestUtils$.createServer(TestUtils.scala:143) > at > kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:66) > at > kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:66) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245) > at scala.collection.Iterator$class.foreach(Iterator.scala:742) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1194) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > at scala.collection.AbstractIterable.foreach(Iterable.scala:54) > at scala.collection.TraversableLike$class.map(TraversableLike.scala:245) > at scala.collection.AbstractTraversable.map(Traversable.scala:104) > at > kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:66) > at > kafka.api.SslEndToEndAuthorizationTest.kafka$api$IntegrationTestHarness$$super$setUp(SslEndToEndAuthorizationTest.scala:24) > at > kafka.api.IntegrationTestHarness$class.setUp(IntegrationTestHarness.scala:58) > at > kafka.api.SslEndToEndAuthorizationTest.kafka$api$EndToEndAuthorizationTest$$super$setUp(SslEndToEndAuthorizationTest.scala:24) > at > kafka.api.EndToEndAuthorizationTest$class.setUp(EndToEndAuthorizationTest.scala:141) > at > kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:24) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50) > at