[jira] [Assigned] (KAFKA-4261) Provide debug option in vagrant-up.sh

2016-10-06 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-4261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira reassigned KAFKA-4261:
---

Assignee: Flavio Junqueira

> Provide debug option in vagrant-up.sh
> -
>
> Key: KAFKA-4261
> URL: https://issues.apache.org/jira/browse/KAFKA-4261
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Affects Versions: 0.10.1.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Minor
> Fix For: 0.11.0.0
>
>
> I found it useful to use the debug flag for vagrant, and I had to edit the 
> script to make it happen. Here I'm just proposing a simple change to provide 
> a debug command-line option instead.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-4261) Provide debug option in vagrant-up.sh

2016-10-06 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-4261:
---

 Summary: Provide debug option in vagrant-up.sh
 Key: KAFKA-4261
 URL: https://issues.apache.org/jira/browse/KAFKA-4261
 Project: Kafka
  Issue Type: Improvement
  Components: system tests
Affects Versions: 0.10.1.0
Reporter: Flavio Junqueira
Priority: Minor
 Fix For: 0.11.0.0


I found it useful to use the debug flag for vagrant, and I had to edit the 
script to make it happen. Here I'm just proposing a simple change to provide a 
debug command-line option instead.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3985) Transient system test failure ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol

2016-10-05 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15549221#comment-15549221
 ] 

Flavio Junqueira commented on KAFKA-3985:
-

I've created a pull request for this, focused on this issue. KAFKA-4140 is a 
broader set of changes, and it could incorporate the changes I'm proposing in 
the PR. In fact, it seems to change it in a similar way. 

It is still WIP, though, and might be better for the sake of seeing more green 
in our builds to check this one in while we wait for KAFKA-4140, but I'm happy 
with whatever folks prefer.

> Transient system test failure 
> ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol
> -
>
> Key: KAFKA-3985
> URL: https://issues.apache.org/jira/browse/KAFKA-3985
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 0.10.0.0
>Reporter: Jason Gustafson
>Assignee: Flavio Junqueira
>
> Found this in the nightly build on the 0.10.0 branch. Full details here: 
> http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-07-22--001.1469199875--apache--0.10.0--71a598a/.
>   
> {code}
> test_id:
> 2016-07-22--001.kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL
> status: FAIL
> run time:   5 minutes 14.067 seconds
> 292 acked message did not make it to the Consumer. They are: 11264, 
> 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 11275, 
> 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 more. 
> Total Acked: 11343, Total Consumed: 11054. We validated that the first 272 of 
> these missing messages correctly made it into Kafka's data files. This 
> suggests they were lost on their way to the consumer.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py",
>  line 331, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py",
>  line 115, in test_zk_security_upgrade
> self.run_produce_consume_validate(self.run_zk_migration)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in run_produce_consume_validate
> raise e
> AssertionError: 292 acked message did not make it to the Consumer. They are: 
> 11264, 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 
> 11275, 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 
> more. Total Acked: 11343, Total Consumed: 11054. We validated that the first 
> 272 of these missing messages correctly made it into Kafka's data files. This 
> suggests they were lost on their way to the consumer.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3985) Transient system test failure ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol

2016-10-05 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira reassigned KAFKA-3985:
---

Assignee: Flavio Junqueira

> Transient system test failure 
> ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol
> -
>
> Key: KAFKA-3985
> URL: https://issues.apache.org/jira/browse/KAFKA-3985
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 0.10.0.0
>Reporter: Jason Gustafson
>Assignee: Flavio Junqueira
>
> Found this in the nightly build on the 0.10.0 branch. Full details here: 
> http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-07-22--001.1469199875--apache--0.10.0--71a598a/.
>   
> {code}
> test_id:
> 2016-07-22--001.kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL
> status: FAIL
> run time:   5 minutes 14.067 seconds
> 292 acked message did not make it to the Consumer. They are: 11264, 
> 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 11275, 
> 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 more. 
> Total Acked: 11343, Total Consumed: 11054. We validated that the first 272 of 
> these missing messages correctly made it into Kafka's data files. This 
> suggests they were lost on their way to the consumer.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py",
>  line 331, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py",
>  line 115, in test_zk_security_upgrade
> self.run_produce_consume_validate(self.run_zk_migration)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in run_produce_consume_validate
> raise e
> AssertionError: 292 acked message did not make it to the Consumer. They are: 
> 11264, 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 
> 11275, 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 
> more. Total Acked: 11343, Total Consumed: 11054. We validated that the first 
> 272 of these missing messages correctly made it into Kafka's data files. This 
> suggests they were lost on their way to the consumer.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3985) Transient system test failure ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol

2016-10-05 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548125#comment-15548125
 ] 

Flavio Junqueira edited comment on KAFKA-3985 at 10/5/16 9:02 AM:
--

[~rsivaram]

bq. I wouldn't have expected to see a CA with start time 09:55:15 at all. Is it 
possible that there was another system test running on that same host (the 
tests use a fixed file name for CA and truststore, so the tests would fail if 
there are multiple instances of system tests or any command that loads the 
security config class that is run on that host).

That's a very good point, I'm thinking that the problem is that the CA files 
are in {{/tmp}}: 

{noformat}
self.ca_crt_path = "/tmp/test.ca.crt"
self.ca_jks_path = "/tmp/test.ca.jks"
{noformat}

and perhaps we should use {{mkdtemp}} like in {{generate_and_copy_keystore}}. 
Does it make sense?



was (Author: fpj):
[~rsivaram]

.bq I wouldn't have expected to see a CA with start time 09:55:15 at all. Is it 
possible that there was another system test running on that same host (the 
tests use a fixed file name for CA and truststore, so the tests would fail if 
there are multiple instances of system tests or any command that loads the 
security config class that is run on that host).

That's a very good point, I'm thinking that the problem is that the CA files 
are in {{/tmp}}: 

{noformat}
self.ca_crt_path = "/tmp/test.ca.crt"
self.ca_jks_path = "/tmp/test.ca.jks"
{noformat}

and perhaps we should use {{mkdtemp}} like in {{generate_and_copy_keystore}}. 
Does it make sense?


> Transient system test failure 
> ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol
> -
>
> Key: KAFKA-3985
> URL: https://issues.apache.org/jira/browse/KAFKA-3985
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 0.10.0.0
>Reporter: Jason Gustafson
>
> Found this in the nightly build on the 0.10.0 branch. Full details here: 
> http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-07-22--001.1469199875--apache--0.10.0--71a598a/.
>   
> {code}
> test_id:
> 2016-07-22--001.kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL
> status: FAIL
> run time:   5 minutes 14.067 seconds
> 292 acked message did not make it to the Consumer. They are: 11264, 
> 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 11275, 
> 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 more. 
> Total Acked: 11343, Total Consumed: 11054. We validated that the first 272 of 
> these missing messages correctly made it into Kafka's data files. This 
> suggests they were lost on their way to the consumer.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py",
>  line 331, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py",
>  line 115, in test_zk_security_upgrade
> self.run_produce_consume_validate(self.run_zk_migration)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in run_produce_consume_validate
> raise e
> AssertionError: 292 acked message did not make it to the Consumer. They are: 
> 11264, 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 
> 11275, 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 
> more. Total Acked: 11343, Total Consumed: 11054. We validated that the first 
> 272 of these missing messages correctly made it into Kafka's data files. This 
> suggests they were lost on their way to the consumer.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3985) Transient system test failure ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol

2016-10-05 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15548125#comment-15548125
 ] 

Flavio Junqueira commented on KAFKA-3985:
-

[~rsivaram]

.bq I wouldn't have expected to see a CA with start time 09:55:15 at all. Is it 
possible that there was another system test running on that same host (the 
tests use a fixed file name for CA and truststore, so the tests would fail if 
there are multiple instances of system tests or any command that loads the 
security config class that is run on that host).

That's a very good point, I'm thinking that the problem is that the CA files 
are in {{/tmp}}: 

{noformat}
self.ca_crt_path = "/tmp/test.ca.crt"
self.ca_jks_path = "/tmp/test.ca.jks"
{noformat}

and perhaps we should use {{mkdtemp}} like in {{generate_and_copy_keystore}}. 
Does it make sense?


> Transient system test failure 
> ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol
> -
>
> Key: KAFKA-3985
> URL: https://issues.apache.org/jira/browse/KAFKA-3985
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 0.10.0.0
>Reporter: Jason Gustafson
>
> Found this in the nightly build on the 0.10.0 branch. Full details here: 
> http://testing.confluent.io/confluent-kafka-0-10-0-system-test-results/?prefix=2016-07-22--001.1469199875--apache--0.10.0--71a598a/.
>   
> {code}
> test_id:
> 2016-07-22--001.kafkatest.tests.core.zookeeper_security_upgrade_test.ZooKeeperSecurityUpgradeTest.test_zk_security_upgrade.security_protocol=SSL
> status: FAIL
> run time:   5 minutes 14.067 seconds
> 292 acked message did not make it to the Consumer. They are: 11264, 
> 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 11275, 
> 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 more. 
> Total Acked: 11343, Total Consumed: 11054. We validated that the first 272 of 
> these missing messages correctly made it into Kafka's data files. This 
> suggests they were lost on their way to the consumer.
> Traceback (most recent call last):
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
>  line 106, in run_all_tests
> data = self.run_single_test()
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/tests/runner.py",
>  line 162, in run_single_test
> return self.current_test_context.function(self.current_test)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/venv/local/lib/python2.7/site-packages/ducktape/mark/_mark.py",
>  line 331, in wrapper
> return functools.partial(f, *args, **kwargs)(*w_args, **w_kwargs)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/core/zookeeper_security_upgrade_test.py",
>  line 115, in test_zk_security_upgrade
> self.run_produce_consume_validate(self.run_zk_migration)
>   File 
> "/var/lib/jenkins/workspace/system-test-kafka-0.10.0/kafka/tests/kafkatest/tests/produce_consume_validate.py",
>  line 79, in run_produce_consume_validate
> raise e
> AssertionError: 292 acked message did not make it to the Consumer. They are: 
> 11264, 11265, 11266, 11267, 11268, 11269, 11270, 11271, 11272, 11273, 11274, 
> 11275, 11276, 11277, 11278, 11279, 11280, 11281, 11282, 11283, ...plus 252 
> more. Total Acked: 11343, Total Consumed: 11054. We validated that the first 
> 272 of these missing messages correctly made it into Kafka's data files. This 
> suggests they were lost on their way to the consumer.
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-4196) Transient test failure: DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK

2016-09-20 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15507443#comment-15507443
 ] 

Flavio Junqueira commented on KAFKA-4196:
-

After a cursory look, it looks like we could have this behavior if:

# There is an event that triggers {{TopicsChangeListener.handleChildChange}}.
# The previous event is followed by a broker change: {{BrokerChangeListener}}.

According to the description, we do have the broker change event. The topics 
event only happens after the topic has been deleted from under 
{{/broker/topics}} in zk, though. If the controller instance that triggers the 
first is the same that deletes the topic, then it doesn't look like we can have 
the behavior above because: 1) all those events are processed under the 
controller context lock; 2) the controller deletes the topic znodes and updates 
{{ControllerContext.partitionLeadershipInfo}} and 
{{controllerContext.partitionReplicaAssignment}}. Consequently, one possibility 
is a race between two controllers. One puzzling point is that the delete znode 
for the topic isn't going away, which indicates that no controller instance is 
completing successfully the delete operation.

I'd need to investigate some more to find the culprit. If it happens again and 
you have a chance, please upload the logs. I'll see if I can repro locally.

> Transient test failure: 
> DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK
> ---
>
> Key: KAFKA-4196
> URL: https://issues.apache.org/jira/browse/KAFKA-4196
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>  Labels: transient-unit-test-failure
>
> The error:
> {code}
> java.lang.AssertionError: Admin path /admin/delete_topic/topic path not 
> deleted even after a replica is restarted
>   at org.junit.Assert.fail(Assert.java:88)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:752)
>   at kafka.utils.TestUtils$.verifyTopicDeletion(TestUtils.scala:1017)
>   at 
> kafka.admin.DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK(DeleteConsumerGroupTest.scala:156)
> {code}
> Caused by a broken invariant in the Controller: a partition exists in 
> `ControllerContext.partitionLeadershipInfo`, but not 
> `controllerContext.partitionReplicaAssignment`.
> {code}
> [2016-09-20 06:45:13,967] ERROR [BrokerChangeListener on Controller 1]: Error 
> while handling broker changes 
> (kafka.controller.ReplicaStateMachine$BrokerChangeListener:103)
> java.util.NoSuchElementException: key not found: [topic,0]
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> kafka.controller.ControllerBrokerRequestBatch.kafka$controller$ControllerBrokerRequestBatch$$updateMetadataRequestMapFor$1(ControllerChannelManager.scala:310)
>   at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343)
>   at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>   at 
> kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:343)
>   at 
> kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:1030)
>   at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:492)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:376)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at 
> 

[jira] [Commented] (KAFKA-4196) Transient test failure: DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK

2016-09-20 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-4196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15507287#comment-15507287
 ] 

Flavio Junqueira commented on KAFKA-4196:
-

[~ijuma] is this from trunk?

> Transient test failure: 
> DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK
> ---
>
> Key: KAFKA-4196
> URL: https://issues.apache.org/jira/browse/KAFKA-4196
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>  Labels: transient-unit-test-failure
>
> The error:
> {code}
> java.lang.AssertionError: Admin path /admin/delete_topic/topic path not 
> deleted even after a replica is restarted
>   at org.junit.Assert.fail(Assert.java:88)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:752)
>   at kafka.utils.TestUtils$.verifyTopicDeletion(TestUtils.scala:1017)
>   at 
> kafka.admin.DeleteConsumerGroupTest.testConsumptionOnRecreatedTopicAfterTopicWideDeleteInZK(DeleteConsumerGroupTest.scala:156)
> {code}
> Caused by a broken invariant in the Controller: a partition exists in 
> `ControllerContext.partitionLeadershipInfo`, but not 
> `controllerContext.partitionReplicaAssignment`.
> {code}
> [2016-09-20 06:45:13,967] ERROR [BrokerChangeListener on Controller 1]: Error 
> while handling broker changes 
> (kafka.controller.ReplicaStateMachine$BrokerChangeListener:103)
> java.util.NoSuchElementException: key not found: [topic,0]
>   at scala.collection.MapLike$class.default(MapLike.scala:228)
>   at scala.collection.AbstractMap.default(Map.scala:58)
>   at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>   at 
> kafka.controller.ControllerBrokerRequestBatch.kafka$controller$ControllerBrokerRequestBatch$$updateMetadataRequestMapFor$1(ControllerChannelManager.scala:310)
>   at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343)
>   at 
> kafka.controller.ControllerBrokerRequestBatch$$anonfun$addUpdateMetadataRequestForBrokers$4.apply(ControllerChannelManager.scala:343)
>   at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
>   at 
> kafka.controller.ControllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(ControllerChannelManager.scala:343)
>   at 
> kafka.controller.KafkaController.sendUpdateMetadataRequest(KafkaController.scala:1030)
>   at 
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:492)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:376)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:358)
>   at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:357)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:356)
>   at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:234)
>   at 
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:355)
>   at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:843)
>   at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2016-08-13 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15419984#comment-15419984
 ] 

Flavio Junqueira commented on KAFKA-1211:
-

[~junrao]

bq. the leader needs to first wait for the follower to receive a message before 
it can advance the last committed offset.

makes sense

bq. it can propagate the last committed offset to the follower

makes sense

bq. the last committed offset in the follower is always behind that in the 
leader

makes sense, it is either equal or behind, never ahead.

bq. Since the follower truncates based on the local last committed offset, it's 
possible for the follower to truncate messages that are already committed by 
the leader.

I'm not sure why we are doing this. A follower can't truncate until it hears 
from the leader upon recovery, it shouldn't truncate based on its local last 
committed offset.

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2016-08-06 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410693#comment-15410693
 ] 

Flavio Junqueira commented on KAFKA-1211:
-

[~junrao] I was reading point (a) in your answer again, and there is something 
I don't understand. You say that the follower truncates and then become leader. 
This is fine, I understand it can happen. The bit I don't understand is how it 
can truncate committed messages. 

Let's say that we are talking about servers A and B, min ISR is 2 (the replica 
set can be larger than 2, but it doesn't really matter for this example):

# A leads initially and B follows A.
# B truncates
# B becomes leader

If A leads, the it means that it was previously in the ISR (assuming unclean 
leader election disabled) and it contains all committed messages. If B was also 
part of the previous ISR, then both A and B it will also have all committed and 
B won't truncate committed messages.

The situation you describe can only happen if either A or B lose committed 
messages on their own and not because of the truncation, e.g., if the messages 
didn't make it from the page cache to disk before a crash.

Is my understanding correct?

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2016-08-06 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15410682#comment-15410682
 ] 

Flavio Junqueira commented on KAFKA-1211:
-

[~junrao] I get (2), but (1) might need a bit more tweaking because of the 
following. Say the follower executes the steps in the order you describe:

# Truncate log
# Update LGS to reflect the truncated log (flush the prefix of LGS whose start 
offset is up to the log end offset)
# Fetch and update the LGS as messages cross leader boundaries

If the follower crashes in the middle of fetching and updating the LGS, it may 
leave the LGS in an inconsistent state. For example, let's say that it crossed 
the boundary of a generation, it writes to the log, and crashes before updating 
the LGS. I'm actually thinking that it might be better to have the update in 
the LGS first because in the worst case we point to an offset that is not in 
the log, so we know that the LGS entry is invalid. In any case, it sounds like 
there is some work to be done to make sure the LGS and the log are consistent.



> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2016-08-03 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15406063#comment-15406063
 ] 

Flavio Junqueira commented on KAFKA-1211:
-

Thanks for the clarification, [~junrao]. There are a couple of specific points 
that still aren't entirely clear to me:

# We are trying to preserve the generation when we copy messages to a follower, 
correct? In step 3.4, when we say that the follower flushes the LGS, we are 
more specifically trying to replicate the leader LGS, is that right? What 
happens if either the follower crashes or the leader changes between persisting 
the new LGS and fetching the new messages from the leader? I'm concerned that 
we will leave the LGS and the log of the broker in an inconsistent state.
# When we say in step 3.4 that the follower needs to remember the LLG, I 
suppose this is just during this recovery period. Otherwise, once we have 
completed the sync up, the follower knows that the latest generation is the 
LLG. During sync up, there is the question I'm raising above, but it is also 
not super clear whether we need to persist the LLG independently to make sure 
that we don't have a situation in which the follower crashes, comes back, and 
accepts messages from a different generation.

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2016-08-02 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404245#comment-15404245
 ] 

Flavio Junqueira edited comment on KAFKA-1211 at 8/2/16 4:14 PM:
-

[~junrao] let me ask a few clarification questions.

# Is it right that the scenarios described here do not affect the cases in 
which min isr > 1 and unclean leader election is disabled? If min isr is 
greater than 1 and the leader is always coming from the latest isr, then the 
leader can either truncate the followers or have them fetch the missing log 
suffix.
# The main goal of the proposal is to have replicas in a lossy configuration 
(e.g. min isr = 1, unclean leader election enabled) a leader and a follower 
converging to a common prefix by choosing an offset based on a common 
generation. The chosen generation is the largest generation in common between 
the two replicas. Is it right?
# How do we guarantee that the generation id is unique, by using zookeeper 
versions?
# I think there is a potential race between updating the 
leader-generation-checkpoint file and appending the first message of the 
generation. We might be better off rolling the log segment file and having the 
generation being part of the log segment file name. This way when we start a 
new generation, we also start a new file and we know precisely when a message 
from that generation has been appended.
# Let's consider a scenario with 3 servers A B C. I'm again assuming that it is 
ok to have a single server up to ack requests. Say we have the following 
execution:

||Generation||A||B||C||
|1| |m1|m1|
| | |m2|m2|
|2|m3| | |
| |m4| | |

Say that now A and B start generation 3. They have no generation in common, so 
they start from zero, dropping m1 and m2. Is that right? If later on C joins A 
and B, then it will also drop m1 and m2, right? Given that the configuration is 
lossy, it doesn't wrong to do it as all we are trying to do is to converge to a 
consistent state. 


was (Author: fpj):
[~junrao] let me ask a few clarification questions.

# Is it right that the scenarios described here do not affect the cases in 
which min isr > 1 and unclean leader election is disabled? If min isr is 
greater than 1 and the leader is always coming from the latest isr, then the 
leader can either truncate the followers or have them fetch the missing log 
suffix.
# The main goal of the proposal is to have replicas in a lossy configuration 
(e.g. min isr = 1, unclean leader election enabled) a leader and a follower 
converging to a common prefix by choosing an offset based on a common 
generation. The chosen generation is the largest generation in common between 
the two replicas. Is it right?
# How do we guarantee that the generation id is unique, by using zookeeper 
versions?
# I think there is a potential race between updating the 
leader-generation-checkpoint file and appending the first message of the 
generation. We might be better off rolling the log segment file and having the 
generation being part of the log segment file name. This way when we start a 
new generation, we also start a new file and we know precisely when a message 
from that generation has been appended.
# Let's consider a scenario with 3 servers A B C. I'm again assuming that it is 
ok to have a single server up to ack requests. Say we have the following 
execution:

{noformat}
Generation AB C
1  m1   
   m1
m2  
m2
2m3
  m4
{noformat}

Say that now A and B start generation 3. They have no generation in common, so 
the start from zero, dropping m1 and m2. Is that right? If later on C joins A 
and B, then it will also drop m1 and m2, right? Given that the configuration is 
lossy, it doesn't wrong to do it as all we are trying to do is to converge to a 
consistent state. 

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to 

[jira] [Commented] (KAFKA-1211) Hold the produce request with ack > 1 in purgatory until replicas' HW has larger than the produce offset

2016-08-02 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15404245#comment-15404245
 ] 

Flavio Junqueira commented on KAFKA-1211:
-

[~junrao] let me ask a few clarification questions.

# Is it right that the scenarios described here do not affect the cases in 
which min isr > 1 and unclean leader election is disabled? If min isr is 
greater than 1 and the leader is always coming from the latest isr, then the 
leader can either truncate the followers or have them fetch the missing log 
suffix.
# The main goal of the proposal is to have replicas in a lossy configuration 
(e.g. min isr = 1, unclean leader election enabled) a leader and a follower 
converging to a common prefix by choosing an offset based on a common 
generation. The chosen generation is the largest generation in common between 
the two replicas. Is it right?
# How do we guarantee that the generation id is unique, by using zookeeper 
versions?
# I think there is a potential race between updating the 
leader-generation-checkpoint file and appending the first message of the 
generation. We might be better off rolling the log segment file and having the 
generation being part of the log segment file name. This way when we start a 
new generation, we also start a new file and we know precisely when a message 
from that generation has been appended.
# Let's consider a scenario with 3 servers A B C. I'm again assuming that it is 
ok to have a single server up to ack requests. Say we have the following 
execution:

{noformat}
Generation AB C
1  m1   
   m1
m2  
m2
2m3
  m4
{noformat}

Say that now A and B start generation 3. They have no generation in common, so 
the start from zero, dropping m1 and m2. Is that right? If later on C joins A 
and B, then it will also drop m1 and m2, right? Given that the configuration is 
lossy, it doesn't wrong to do it as all we are trying to do is to converge to a 
consistent state. 

> Hold the produce request with ack > 1 in purgatory until replicas' HW has 
> larger than the produce offset
> 
>
> Key: KAFKA-1211
> URL: https://issues.apache.org/jira/browse/KAFKA-1211
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
> Fix For: 0.11.0.0
>
>
> Today during leader failover we will have a weakness period when the 
> followers truncate their data before fetching from the new leader, i.e., 
> number of in-sync replicas is just 1. If during this time the leader has also 
> failed then produce requests with ack >1 that have get responded will still 
> be lost. To avoid this scenario we would prefer to hold the produce request 
> in purgatory until replica's HW has larger than the offset instead of just 
> their end-of-log offsets.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)

2016-07-04 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361397#comment-15361397
 ] 

Flavio Junqueira commented on KAFKA-873:


I see what you're saying. That support in in zkClient was added specifically 
for our needs in Kafka. It is correct that it is more flexible to do the 
Curator way, but I'm not sure how we would be able to benefit from it in Kafka 
given that this security configuration is internal to the brokers. If you have 
any specific idea, let me know and we can see if it is feasible. 

> Consider replacing zkclient with curator (with zkclient-bridge)
> ---
>
> Key: KAFKA-873
> URL: https://issues.apache.org/jira/browse/KAFKA-873
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.0
>Reporter: Scott Clasen
>Assignee: Grant Henke
>
> If zkclient was replaced with curator and curator-x-zkclient-bridge it would 
> be initially a drop-in replacement
> https://github.com/Netflix/curator/wiki/ZKClient-Bridge
> With the addition of a few more props to ZkConfig, and a bit of code this 
> would open up the possibility of using ACLs in zookeeper (which arent 
> supported directly by zkclient), as well as integrating with netflix 
> exhibitor for those of us using that.
> Looks like KafkaZookeeperClient needs some love anyhow...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-873) Consider replacing zkclient with curator (with zkclient-bridge)

2016-07-04 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-873?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15361236#comment-15361236
 ] 

Flavio Junqueira commented on KAFKA-873:


[~mtomcat_sslavic] could be more specific about what you're missing?

> Consider replacing zkclient with curator (with zkclient-bridge)
> ---
>
> Key: KAFKA-873
> URL: https://issues.apache.org/jira/browse/KAFKA-873
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 0.8.0
>Reporter: Scott Clasen
>Assignee: Grant Henke
>
> If zkclient was replaced with curator and curator-x-zkclient-bridge it would 
> be initially a drop-in replacement
> https://github.com/Netflix/curator/wiki/ZKClient-Bridge
> With the addition of a few more props to ZkConfig, and a bit of code this 
> would open up the possibility of using ACLs in zookeeper (which arent 
> supported directly by zkclient), as well as integrating with netflix 
> exhibitor for those of us using that.
> Looks like KafkaZookeeperClient needs some love anyhow...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3709) Create project security page

2016-05-13 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3709:
---

 Summary: Create project security page
 Key: KAFKA-3709
 URL: https://issues.apache.org/jira/browse/KAFKA-3709
 Project: Kafka
  Issue Type: Improvement
  Components: website
Reporter: Flavio Junqueira
Assignee: Flavio Junqueira


We are creating a security@k.a.o mailing list to receive reports of potential 
vulnerabilities. Now that Kafka has security in place, the community might 
starts receiving vulnerability reports and we need to follow the guidelines 
here:

http://www.apache.org/security/

Specifically, security issues are better handled in a project-specific list. 
This jira is to create a web page that informs users and contributors of how we 
are supposed to handle security issues. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3686) Kafka producer is not fault tolerant

2016-05-12 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281508#comment-15281508
 ] 

Flavio Junqueira commented on KAFKA-3686:
-

(on behalf of [~lethalman])

[~fpj] yes, I want the first. In the case of network partitions I want to 
ensure my messages are stored. If the libraries don't do that, it means I have 
to reimplement them. Or otherwise, postpone sending such messages until the 
network partition resolves (which means implementing some kind of backlog on 
disk of the producer, which should instead be the kafka purpose after all). In 
both cases, it's something that is not documented and it's very inconvenient.

> Kafka producer is not fault tolerant
> 
>
> Key: KAFKA-3686
> URL: https://issues.apache.org/jira/browse/KAFKA-3686
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Luca Bruno
>
> *Setup*
> I have a cluster of 3 kafka server, a topic with 12 partitions with replica 
> 2, and a zookeeper cluster of 3 nodes.
> Producer config:
> {code}
>  props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");
>  props.put("acks", "1");
>  props.put("batch.size", 16384);
>  props.put("retries", 3);
>  props.put("buffer.memory", 33554432);
>  props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>  props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> {code}
> Producer code:
> {code}
>  Producer producer = new KafkaProducer<>(props);
>  for(int i = 0; i < 10; i++) {
>  Future f = producer.send(new ProducerRecord String>("topic", null, Integer.toString(i)));
>  f.get();
>  }
> {code}
> *Problem*
> Cut the network between the producer (p1) and one of the kafka servers (say 
> k1).
> The cluster is healthy, hence the kafka bootstrap tells the producer that 
> there are 3 kafka servers (as I understood it), and the leaders of the 
> partitions of the topic.
> So the producer will send messages to all of the 3 leaders for each 
> partition. If the leader happens to be k1 for a message, the producer raises 
> the following exception after request.timeout.ms:
> {code}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Batch Expired
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>   at Test.main(Test.java:25)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
> {code}
> In theory, the application should handle the failure. In practice, messages 
> are getting lost, even though there are other 2 leaders available for writing.
> I tried with values of acks both 1 and -1.
> *What I expected*
> Given the client library is automatically deciding the hashing / round robin 
> schema for the partition, I would say it's not very important which partition 
> the message is being sent to.
> I expect the client library to handle the failure by sending the message to a 
> partition of a different leader.
> Neither kafka-clients nor rdkafka handle this failure. Given those are the 
> main client libraries being used for kafka as far as I know, I find it a 
> serious problem in terms of fault tolerance.
> EDIT: I cannot add comments to this issue, don't understand why. To answer 
> [~fpj] yes, I want the first. In the case of network partitions I want to 
> ensure my messages are stored. If the libraries don't do that, it means I 
> have to reimplement them. Or otherwise, postpone sending such messages until 
> the network partition resolves (which means implementing some kind of backlog 
> on disk of the producer, which should instead be the kafka purpose after 
> all). In both cases, it's something that is not documented and it's very 
> inconvenient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3686) Kafka producer is not fault tolerant

2016-05-12 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281505#comment-15281505
 ] 

Flavio Junqueira commented on KAFKA-3686:
-

[~lethalman] Jira is on lockdown because of spam, someone needs to add you as a 
contributor to jira.

> Kafka producer is not fault tolerant
> 
>
> Key: KAFKA-3686
> URL: https://issues.apache.org/jira/browse/KAFKA-3686
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Luca Bruno
>
> *Setup*
> I have a cluster of 3 kafka server, a topic with 12 partitions with replica 
> 2, and a zookeeper cluster of 3 nodes.
> Producer config:
> {code}
>  props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");
>  props.put("acks", "1");
>  props.put("batch.size", 16384);
>  props.put("retries", 3);
>  props.put("buffer.memory", 33554432);
>  props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>  props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> {code}
> Producer code:
> {code}
>  Producer producer = new KafkaProducer<>(props);
>  for(int i = 0; i < 10; i++) {
>  Future f = producer.send(new ProducerRecord String>("topic", null, Integer.toString(i)));
>  f.get();
>  }
> {code}
> *Problem*
> Cut the network between the producer (p1) and one of the kafka servers (say 
> k1).
> The cluster is healthy, hence the kafka bootstrap tells the producer that 
> there are 3 kafka servers (as I understood it), and the leaders of the 
> partitions of the topic.
> So the producer will send messages to all of the 3 leaders for each 
> partition. If the leader happens to be k1 for a message, the producer raises 
> the following exception after request.timeout.ms:
> {code}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Batch Expired
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>   at Test.main(Test.java:25)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
> {code}
> In theory, the application should handle the failure. In practice, messages 
> are getting lost, even though there are other 2 leaders available for writing.
> I tried with values of acks both 1 and -1.
> *What I expected*
> Given the client library is automatically deciding the hashing / round robin 
> schema for the partition, I would say it's not very important which partition 
> the message is being sent to.
> I expect the client library to handle the failure by sending the message to a 
> partition of a different leader.
> Neither kafka-clients nor rdkafka handle this failure. Given those are the 
> main client libraries being used for kafka as far as I know, I find it a 
> serious problem in terms of fault tolerance.
> EDIT: I cannot add comments to this issue, don't understand why. To answer 
> [~fpj] yes, I want the first. In the case of network partitions I want to 
> ensure my messages are stored. If the libraries don't do that, it means I 
> have to reimplement them. Or otherwise, postpone sending such messages until 
> the network partition resolves (which means implementing some kind of backlog 
> on disk of the producer, which should instead be the kafka purpose after 
> all). In both cases, it's something that is not documented and it's very 
> inconvenient.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3686) Kafka producer is not fault tolerant

2016-05-12 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3686?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15281490#comment-15281490
 ] 

Flavio Junqueira commented on KAFKA-3686:
-

[~lethalman] thanks for bringing this up. There at least two possible ways of 
interpreting the round-robin behavior when producing:

# I want to send my message to any partition available, so I don't care which 
partition takes it.
# I want to split my messages evenly across partitions to balance the 
consumption load.

One issue with the first is that it can cause load imbalance in the present of 
network partitions, but they both sound like sensible choices to me with their 
own pros and cons. I believe you're suggesting that you'd rather have the first 
or at least the choice of having the first option. 

> Kafka producer is not fault tolerant
> 
>
> Key: KAFKA-3686
> URL: https://issues.apache.org/jira/browse/KAFKA-3686
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.1
>Reporter: Luca Bruno
>
> *Setup*
> I have a cluster of 3 kafka server, a topic with 12 partitions with replica 
> 2, and a zookeeper cluster of 3 nodes.
> Producer config:
> {code}
>  props.put("bootstrap.servers", "k1:9092,k2:9092,k3:9092");
>  props.put("acks", "1");
>  props.put("batch.size", 16384);
>  props.put("retries", 3);
>  props.put("buffer.memory", 33554432);
>  props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
>  props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer");
> {code}
> Producer code:
> {code}
>  Producer producer = new KafkaProducer<>(props);
>  for(int i = 0; i < 10; i++) {
>  Future f = producer.send(new ProducerRecord String>("topic", null, Integer.toString(i)));
>  f.get();
>  }
> {code}
> *Problem*
> Cut the network between the producer (p1) and one of the kafka servers (say 
> k1).
> The cluster is healthy, hence the kafka bootstrap tells the producer that 
> there are 3 kafka servers (as I understood it), and the leaders of the 
> partitions of the topic.
> So the producer will send messages to all of the 3 leaders for each 
> partition. If the leader happens to be k1 for a message, the producer raises 
> the following exception after request.timeout.ms:
> {code}
> Exception in thread "main" java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.TimeoutException: Batch Expired
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:56)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:43)
>   at 
> org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:25)
>   at Test.main(Test.java:25)
> Caused by: org.apache.kafka.common.errors.TimeoutException: Batch Expired
> {code}
> In theory, the application should handle the failure. In practice, messages 
> are getting lost, even though there are other 2 leaders available for writing.
> I tried with values of acks both 1 and -1.
> *What I expected*
> Given the client library is automatically deciding the hashing / round robin 
> schema for the partition, I would say it's not very important which partition 
> the message is being sent to.
> I expect the client library to handle the failure by sending the message to a 
> partition of a different leader.
> Neither kafka-clients nor rdkafka handle this failure. Given those are the 
> main client libraries being used for kafka as far as I know, I find it a 
> serious problem in terms of fault tolerance.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3525) max.reserved.broker.id off-by-one error

2016-05-05 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15273215#comment-15273215
 ] 

Flavio Junqueira commented on KAFKA-3525:
-

Got it, thank you both for the clarification. {{getSequenceId}} needs to start 
from 1, otherwise we can get a collision.

> max.reserved.broker.id off-by-one error
> ---
>
> Key: KAFKA-3525
> URL: https://issues.apache.org/jira/browse/KAFKA-3525
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Reporter: Alan Braithwaite
>Assignee: Manikumar Reddy
> Fix For: 0.10.1.0
>
>
> There's an off-by-one error in the config check / id generation for 
> max.reserved.broker.id setting.  The auto-generation will generate 
> max.reserved.broker.id as the initial broker id as it's currently written.
> Not sure what the consequences of this are if there's already a broker with 
> that id as I didn't test that behavior.
> This can return 0 + max.reserved.broker.id:
> https://github.com/apache/kafka/blob/8dbd688b1617968329087317fa6bde8b8df0392e/core/src/main/scala/kafka/utils/ZkUtils.scala#L213-L215
> However, this does a <= check, which is inclusive of max.reserved.broker.id:
> https://github.com/apache/kafka/blob/8dbd688b1617968329087317fa6bde8b8df0392e/core/src/main/scala/kafka/server/KafkaConfig.scala#L984-L986



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3660) Log exception message in ControllerBrokerRequestBatch

2016-05-05 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272265#comment-15272265
 ] 

Flavio Junqueira commented on KAFKA-3660:
-

[~ijuma] could you have a look, pls?

> Log exception message in ControllerBrokerRequestBatch
> -
>
> Key: KAFKA-3660
> URL: https://issues.apache.org/jira/browse/KAFKA-3660
> Project: Kafka
>  Issue Type: Sub-task
>Affects Versions: 0.9.0.1
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
> Fix For: 0.10.0.0
>
>
> In the main task, we observed that the exception that is causing a dirty 
> batch isn't being logged. We add here to the current logging so that we can 
> see the exception message to help us debug the main issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3660) Log exception message in ControllerBrokerRequestBatch

2016-05-05 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3660:
---

 Summary: Log exception message in ControllerBrokerRequestBatch
 Key: KAFKA-3660
 URL: https://issues.apache.org/jira/browse/KAFKA-3660
 Project: Kafka
  Issue Type: Sub-task
Affects Versions: 0.9.0.1
Reporter: Flavio Junqueira
Assignee: Flavio Junqueira
 Fix For: 0.10.0.0


In the main task, we observed that the exception that is causing a dirty batch 
isn't being logged. We add here to the current logging so that we can see the 
exception message to help us debug the main issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3525) max.reserved.broker.id off-by-one error

2016-05-05 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3525?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272234#comment-15272234
 ] 

Flavio Junqueira commented on KAFKA-3525:
-

I'm a bit confused by what this issue is trying to accomplish. The 
documentation for {{reserved.broker.max.id}} says "Max number that can be used 
for a broker.id", which in my interpretation says that the maximum id is the 
value of {{reserved.broker.max.id}}, and not that there are 
{{reserved.broker.max.id}} possible values reserved. Unless I'm missing 
something, the code is correct without the change proposed in the PR, but 
please clarify if I'm missing the point. 

> max.reserved.broker.id off-by-one error
> ---
>
> Key: KAFKA-3525
> URL: https://issues.apache.org/jira/browse/KAFKA-3525
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Reporter: Alan Braithwaite
>Assignee: Manikumar Reddy
> Fix For: 0.10.1.0
>
>
> There's an off-by-one error in the config check / id generation for 
> max.reserved.broker.id setting.  The auto-generation will generate 
> max.reserved.broker.id as the initial broker id as it's currently written.
> Not sure what the consequences of this are if there's already a broker with 
> that id as I didn't test that behavior.
> This can return 0 + max.reserved.broker.id:
> https://github.com/apache/kafka/blob/8dbd688b1617968329087317fa6bde8b8df0392e/core/src/main/scala/kafka/utils/ZkUtils.scala#L213-L215
> However, this does a <= check, which is inclusive of max.reserved.broker.id:
> https://github.com/apache/kafka/blob/8dbd688b1617968329087317fa6bde8b8df0392e/core/src/main/scala/kafka/server/KafkaConfig.scala#L984-L986



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-05-05 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3173:

Status: Open  (was: Patch Available)

> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Critical
> Fix For: 0.10.0.0
>
> Attachments: KAFKA-3173-race-repro.patch
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. 
> Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lostat 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
> at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
> at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
> at 
> org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-05-05 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15272070#comment-15272070
 ] 

Flavio Junqueira commented on KAFKA-3173:
-

[~junrao] True, {{onControllerFailover}} has the lock when it runs, and it is 
the only place where we call {{PartitionStateMachine.startup()}}. The confusing 
part is that the lock is acquired a few hops upwards in the call path, but it 
does look like the additional lock isn't necessary. Also, I'm wondering if we 
even need that controller lock. All the zk events are processed using the 
ZkClient event thread, and there is just one. The runs I was trying to put 
together had concurrent zk events being triggered, which was causing the 
potential problems I raised above. If there is any chance of internal threads 
racing excluding the ZkClient event thread, then the lock is needed, otherwise 
it isn't.

I don't think we need the change I proposed, so I'll go ahead and close the PR, 
but we can't resolve this issue until we determine the cases in which we can 
get a dirty batch, preventing the controller from sending further requests. We 
need more info on this. One of the possibilities given what I've seen in other 
logs is simply that there is a transient error while sending a message to a 
broker in {{ControllerBrokerRequestBatch.sendRequestsToBrokers}}, but we are 
currently not logging the exception. I was hoping that the originator of the 
call would log it, but it isn't happen. Perhaps one thing we can do for the 
upcoming release is to log the exception in the case we observe the problem 
again.

> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Critical
> Fix For: 0.10.0.0
>
> Attachments: KAFKA-3173-race-repro.patch
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. 
> Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lostat 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
> at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
> at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
> at 
> org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3640) Reduce the latency of topic metadata requests

2016-04-28 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3640:
---

 Summary: Reduce the latency of topic metadata requests
 Key: KAFKA-3640
 URL: https://issues.apache.org/jira/browse/KAFKA-3640
 Project: Kafka
  Issue Type: Improvement
Affects Versions: 0.9.0.1
Reporter: Flavio Junqueira
Assignee: Flavio Junqueira
 Fix For: 0.9.0.2


Changes to reduce the latency of topic metadata requests based on the PR of 
KAFKA-2073.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3128) Add metrics for ZooKeeper events

2016-04-22 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15254591#comment-15254591
 ] 

Flavio Junqueira commented on KAFKA-3128:
-

[~junrao] Perhaps we can do more than just counting the session expiration 
events and count all events that come through 
{{SessionExpireListener.handleStateChange}}. For example, it might be useful to 
know if there are several connection loss events even if they don't lead to 
session expiration. 

> Add metrics for ZooKeeper events
> 
>
> Key: KAFKA-3128
> URL: https://issues.apache.org/jira/browse/KAFKA-3128
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, zkclient
>Reporter: Flavio Junqueira
>Assignee: Ismael Juma
> Fix For: 0.10.0.0
>
>
> It would be useful to report via Kafka metrics the number of ZK event 
> notifications, such as connection loss events, session expiration events, 
> etc., as a way of spotting potential issues with the communication with the 
> ZK ensemble.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-19 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15247802#comment-15247802
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

[~junrao] It makes sense, thanks for the analysis. Trying to reconstruct the 
problem in steps, this is what's going on:

# Broker 5 thinks broker 4 is alive and sends a LeaderAndIsr request to broker 
1 with 4 as the leader.
# Broker 1 doesn't have 4 cached as a live broker, so it fails the request to 
make it a follower of the partition.

The LeaderAndIsr request has a list of live leaders, and I suppose 4 is in that 
list. 

To sort this out, I can see two options:

# We simply update the metadata cache upon receiving a LeaderAndIsr request 
using the list of live leaders. This update needs to be  the union of the 
current set with the set of leaders.
# You also suggested to send an UpdateMetadata request first to update the set 
of love brokers. 

I can't see any problem with 1, and I can't see any immediate problem with 2 
either, but I'm concerned about finding ourselves with another race condition 
if we send an update first. What do you think?  

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Fix For: 0.10.0.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-18 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15245725#comment-15245725
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

I checked the broker change listener output of broker 5:

{noformat}
[2016-04-09 00:37:54,079] INFO [BrokerChangeListener on Controller 5]: Broker 
change listener fired for path /brokers/ids with children 1,2,3,5 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2016-04-09 00:41:35,442] INFO [BrokerChangeListener on Controller 5]: Broker 
change listener fired for path /brokers/ids with children 1,4,5 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
{noformat}

These are the two events I could find that determine a time interval including 
the request event. The LeaderAndIsrRequest from broker 5 comes in at 
{{[2016-04-09 00:40:58,144]}}. This is in this comment:

https://issues.apache.org/jira/browse/KAFKA-3042?focusedCommentId=15236055=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15236055

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Fix For: 0.10.0.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-14 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15241314#comment-15241314
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

[~junrao] In this comment:

https://issues.apache.org/jira/browse/KAFKA-3042?focusedCommentId=15236055=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15236055

 I showed that broker 5 is the one that sent the LeaderAndIsr request to broker 
1, and in here:

https://issues.apache.org/jira/browse/KAFKA-3042?focusedCommentId=15237383=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15237383

that broker 5 also didn't have broker 4 as a live broker when it sent the 
request to broker 1. It does sound right that the controller on failover should 
update the list of live brokers on other brokers before sending requests that 
make them followers or at least the problem should be transient in the sense 
that it could be corrected with a later message. However, it sounds like for 
the partition we are analyzing, there is this additional problem that 
controller 5 also didn't have broker 4 in its list of live brokers.

Interestingly, I also caught an instance of this:

{noformat}
[2016-04-09 00:37:54,111] DEBUG Sending MetadataRequest to 
Brokers:ArrayBuffer(2, 5)...
[2016-04-09 00:37:54,111] ERROR Haven't been able to send metadata update 
requests...
[2016-04-09 00:37:54,112] ERROR [Controller 5]: Forcing the controller to 
resign (kafka.controller.KafkaController)
{noformat}

I don't think this is related, but we have been wondering in another issue 
about the possible causes of batches in {{ControllerBrokerRequestBatch}} not 
being empty, and there are a few occurrences of it in these logs. This is 
happening, however, right after the controller resigns, so I'm guessing this is 
related to the controller shutting down:

{noformat}
[2016-04-09 00:37:54,064] INFO [Controller 5]: Broker 5 resigned as the 
controller (kafka.controller.KafkaController)
{noformat}

In any case, for this last issue, I'll create a jira to make sure that we have 
enough info to identify this issue when it happens. Currently, the exception is 
being propagated, but nowhere we are logging the cause.


> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Fix For: 0.10.0.0
>
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-12 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15237824#comment-15237824
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

hey [~wushujames]

bq. you said that broker 3 failed to release leadership to broker 4 because 
broker 4 was offline

it is actually broker the one that failed to release leadership.

bq. What is the correct behavior for that scenario?

The behavior isn't incorrect in the following sense. We cannot completely 
prevent a single broker from being partitioned from the other replicas. If this 
broker is the leader before the partition, then it may remain in this state for 
some time. In the meanwhile, the other replicas may form a new ISR and make 
progress independently. But, very important, the partitioned broker won't be 
able to commit anything on its own, assuming that the minimum ISR is at least 
two.

In the scenario we are discussing, we don't have a network partition, but the 
behavior is equivalent: broker 1 will remain the leader until it is able to 
follow successfully. The part is bad is that broker 1 isn't partitioned away, 
it is talking to other controllers, and the broker should be brought back into 
a state that it can make progress with that partition and others that are 
equally stuck. The bottom line is that is safe, but we clearly want the broker 
up and making progress with those partitions.

Let me point out that from the logs, it looks like you have unclean leader 
election enabled because of this log message:

{noformat}
[2016-04-09 00:40:50,911] WARN [OfflinePartitionLeaderSelector]: No broker in 
ISR is alive for [tec1.en2.frontend.syncPing,7]. 
Elect leader 4 from live brokers 4. There's potential data loss. 
(kafka.controller.OfflinePartitionLeaderSelector)
{noformat} 

and no minimum ISR set:

{noformat}
[2016-04-09 00:56:53,009] WARN [Controller 5]: Cannot remove replica 1 from ISR 
of partition [tec1.en2.frontend.syncPing,7]
since it is not in the ISR. Leader = 4 ; ISR = List(4) 
{noformat}

Those options can cause some data loss.

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-12 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15237383#comment-15237383
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

I had a look at the zookeeper logs, and I couldn’t see anything unusual. There 
are session expirations, but it is expected that sessions expire.

Using the same topic-partition I used in my last comment, 
[tec1.en2.frontend.syncPing,7], I found that the reason seems to be that 
controller 5 is telling broker 1 that the partition leader is 4, but neither 5 
nor 1 think that broker 4 is up. Here are some relevant log lines from broker 5:

{noformat} 
[2016-04-09 00:37:54,079] INFO [BrokerChangeListener on Controller 5]: Broker 
change listener fired for path /brokers/ids with children 1,2,3,5 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2016-04-09 00:37:53,709] DEBUG [Partition state machine on Controller 5]: 
After leader election, leader cache is updated to Map…. 
[tec1.en2.frontend.syncPing,7] -> 
(Leader:2,ISR:2,LeaderEpoch:361,ControllerEpoch:410)
[2016-04-09 00:37:53,765] INFO [Partition state machine on Controller 5]: 
Started partition state machine with initial state -> Map… 
[tec1.en2.frontend.syncPing,7] -> OnlinePartition

[2016-04-09 00:40:58,415] DEBUG [Partition state machine on Controller 5]: 
After leader election, leader cache is updated to Map… 
[tec1.en2.frontend.syncPing,7] -> 
(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415)

[2016-04-09 00:41:35,442] INFO [BrokerChangeListener on Controller 5]: Broker 
change listener fired for path /brokers/ids with children 1,4,5 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
{noformat}

Interestingly, broker 3 is the controller for epoch 415, see the last leader 
cache update, and this is the information that broker 1 receives from broker 5 
(see the previous comment). It looks like broker 5 ignored the fact that broker 
4 is down or at least not in its list of live brokers.  

Broker 3 seems to behave correctly with respect to the partition, here are some 
relevant log lines:

{noformat}
[2016-04-09 00:39:57,004] INFO [Controller 3]: Controller 3 incremented epoch 
to 415 (kafka.controller.KafkaController)

[2016-04-09 00:40:46,633] INFO [BrokerChangeListener on Controller 3]: Broker 
change listener fired for path /brokers/ids with children 3,4,5 
(kafka.controller.ReplicaStateMachine$BrokerChangeListener)
[2016-04-09 00:40:46,638] INFO [BrokerChangeListener on Controller 3]: Newly 
added brokers: 4, deleted brokers: , all live brokers: 3,4,5 
(kafka.controller.ReplicaStateMachine

[2016-04-09 00:40:50,911] DEBUG [OfflinePartitionLeaderSelector]: No broker in 
ISR is alive for [tec1.en2.frontend.syncPing,7]. Pick the leader from the alive 
assigned replicas: 4 (kafka.controller.OfflinePartitionLeaderSelector)
[2016-04-09 00:40:50,911] WARN [OfflinePartitionLeaderSelector]: No broker in 
ISR is alive for [tec1.en2.frontend.syncPing,7]. Elect leader 4 from live 
brokers 4. There's potential data loss. 
(kafka.controller.OfflinePartitionLeaderSelector)
[2016-04-09 00:40:50,911] INFO [OfflinePartitionLeaderSelector]: Selected new 
leader and ISR {"leader":4,"leader_epoch":364,"isr":[4]} for offline partition 
[tec1.en2.frontend.syncPing,7] (kafka.controller.OfflinePartitionLeaderSelector)

State-change log
[2016-04-09 00:40:50,909] TRACE Controller 3 epoch 415 started leader election 
for partition [tec1.en2.frontend.syncPing,7] (state.change.logger)
[2016-04-09 00:40:50,911] TRACE Controller 3 epoch 415 elected leader 4 for 
Offline partition [tec1.en2.frontend.syncPing,7] (state.change.logger)
[2016-04-09 00:40:50,930] TRACE Controller 3 epoch 415 changed partition 
[tec1.en2.frontend.syncPing,7] from OfflinePartition to OnlinePartition with 
leader 4 (state.change.logger)
{noformat}

To summarize, the problems seems to be that controller 5 tells broker 1 that 
the partition leader is an unavailable broker, and broker 1 fails to change the 
partition leader. As it fails to update the leader to broker 4, broker 1 
remains the leader, which causes it to keep trying to update the ISR and 
printing out the “Cached zkVersion…” messages. Broker 1 does not receive any 
controller update that enables it to correct the problem later on and 
consequently it is stuck with itself as partition leader incorrectly.  

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not 

[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-12 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236055#comment-15236055
 ] 

Flavio Junqueira edited comment on KAFKA-3042 at 4/12/16 2:58 PM:
--

Thanks for the logs. I still don’t have an answer, but I wanted to report some 
progress.

I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 
was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are 
a few things that I’ve observed:

Broker 1 is the leader of the partition:

{noformat}
[2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4)
 for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata 
request sent by controller 1 epoch 414 with correlation id 876 
(state.change.logger)
{noformat}

but soon after it fails to release leadership to broker 4:

{noformat}
[2016-04-09 00:40:58,106] TRACE Broker 1 received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415),ReplicationFactor:3),AllReplicas:1,2,4)
 correlation id 0 from controller 5 epoch 416 for partition 
[tec1.en2.frontend.syncPing,7] (state.change.logger)

[2016-04-09 00:40:58,139] TRACE Broker 1 handling LeaderAndIsr request 
correlationId 0 from controller 5 epoch 416 starting the become-follower 
transition for partition [tec1.en2.frontend.syncPing,7] (state.change.logger)

[2016-04-09 00:40:58,144] ERROR Broker 1 received LeaderAndIsrRequest with 
correlation id 0 from controller 5 epoch 415 for partition 
[tec1.en2.frontend.syncPing,7] but cannot become follower since the new leader 
4 is unavailable. (state.change.logger)
{noformat}

Now, a bit later in the log, the broker says that it is caching the leader info 
for the partition:

{noformat}
[2016-04-09 00:42:02,456] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415),ReplicationFactor:3),AllReplicas:1,2,4)
 for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata 
request sent by controller 5 epoch 416 with correlation id 1473 
(state.change.logger)
{noformat}

but it keeps printing the “Cached zkVersion…” errors, which indicate that the 
broker still believes it is the leader of the partition, or at least the 
variable {{leaderReplicaIdOpt}} is set this way.

I also inspected other partitions, and the behavior doesn’t seem to be 
consistent. I’ve seen at least one partition in broker 2 for which the broker 
made the appropriate transition:

{noformat}
[2016-04-09 00:39:23,840] TRACE Broker 2 received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:3,ISR:2,3,LeaderEpoch:305,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:3,2,4)
 correlation id 535 from controller 1 epoch 414 for partition 
[tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,841] TRACE Broker 2 handling LeaderAndIsr request 
correlationId 535 from controller 1 epoch 414 starting the become-follower 
transition for partition [tec1.ono_qe1.bodydata.recordings,20] 
(state.change.logger)
[2016-04-09 00:39:23,841] TRACE Broker 2 stopped fetchers as part of 
become-follower request from controller 1 epoch 414 with correlation id 535 for 
partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 truncated logs and checkpointed 
recovery boundaries for partition [tec1.ono_qe1.bodydata.recordings,20] as part 
of become-follower request with correlation id 535 from controller 1 epoch 414 
(state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 started fetcher to new leader as part 
of become-follower request from controller 1 epoch 414 with correlation id 535 
for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 completed LeaderAndIsr request 
correlationId 535 from controller 1 epoch 414 for the become-follower 
transition for partition [tec1.ono_qe1.bodydata.recordings,20] 
(state.change.logger)
{noformat}

Actually, the state-change log of broker 2 seems to have a gap starting at 
{{[2016-04-09 00:39:46,246]}}. Is it when you’ve restarted the broker? 


was (Author: fpj):
Thanks for the logs. I still don’t have an answer, but I wanted to report some 
progress.

I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 
was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are 
a few things that I’ve observed:

Broker 1 is the leader of the partition:

{noformat}
[2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4)
 for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata 
request sent by controller 1 epoch 414 with 

[jira] [Resolved] (KAFKA-3205) Error in I/O with host (java.io.EOFException) raised in producer

2016-04-12 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira resolved KAFKA-3205.
-
   Resolution: Won't Fix
 Reviewer: Flavio Junqueira
Fix Version/s: (was: 0.10.0.0)

The changes currently in 0.9+ doesn't have as many messages printed out because 
both ends, client and server, enforce the connection timeout. The change 
discussed in the pull request doesn't print it in the case of a passive close 
initiated by the server (in 0.9 the timeout is enforced), which is desirable 
only because it pollutes the logs otherwise. It is better that we keep these 
messages in 0.9 and later to be informed of connections being closed. They are 
not supposed to happen very often, but if it turns out to be a problem, we can 
revisit this issue.   

> Error in I/O with host (java.io.EOFException) raised in producer
> 
>
> Key: KAFKA-3205
> URL: https://issues.apache.org/jira/browse/KAFKA-3205
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.1, 0.9.0.0
>Reporter: Jonathan Raffre
>
> In a situation with a Kafka broker in 0.9 and producers still in 0.8.2.x, 
> producers seems to raise the following after a variable amount of time since 
> start :
> {noformat}
> 2016-01-29 14:33:13,066 WARN [] o.a.k.c.n.Selector: Error in I/O with 
> 172.22.2.170
> java.io.EOFException: null
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>  ~[org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at org.apache.kafka.common.network.Selector.poll(Selector.java:248) 
> ~[org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) 
> [org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 
> [org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 
> [org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66-internal]
> {noformat}
> This can be reproduced successfully by doing the following :
>  * Start a 0.8.2 producer connected to the 0.9 broker
>  * Wait 15 minutes, exactly
>  * See the error in the producer logs.
> Oddly, this also shows up in an active producer but after 10 minutes of 
> activity.
> Kafka's server.properties :
> {noformat}
> broker.id=1
> listeners=PLAINTEXT://:9092
> port=9092
> num.network.threads=2
> num.io.threads=2
> socket.send.buffer.bytes=1048576
> socket.receive.buffer.bytes=1048576
> socket.request.max.bytes=104857600
> log.dirs=/mnt/data/kafka
> num.partitions=4
> auto.create.topics.enable=false
> delete.topic.enable=true
> num.recovery.threads.per.data.dir=1
> log.retention.hours=48
> log.retention.bytes=524288000
> log.segment.bytes=52428800
> log.retention.check.interval.ms=6
> log.roll.hours=24
> log.cleanup.policy=delete
> log.cleaner.enable=true
> zookeeper.connect=127.0.0.1:2181
> zookeeper.connection.timeout.ms=100
> {noformat}
> Producer's configuration :
> {noformat}
>   compression.type = none
>   metric.reporters = []
>   metadata.max.age.ms = 30
>   metadata.fetch.timeout.ms = 6
>   acks = all
>   batch.size = 16384
>   reconnect.backoff.ms = 10
>   bootstrap.servers = [127.0.0.1:9092]
>   receive.buffer.bytes = 32768
>   retry.backoff.ms = 500
>   buffer.memory = 33554432
>   timeout.ms = 3
>   key.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>   retries = 3
>   max.request.size = 500
>   block.on.buffer.full = true
>   value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>   metrics.sample.window.ms = 3
>   send.buffer.bytes = 131072
>   max.in.flight.requests.per.connection = 5
>   metrics.num.samples = 2
>   linger.ms = 0
>   client.id = 
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236055#comment-15236055
 ] 

Flavio Junqueira edited comment on KAFKA-3042 at 4/11/16 9:58 PM:
--

Thanks for the logs. I still don’t have an answer, but I wanted to report some 
progress.

I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 
was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are 
a few things that I’ve observed:

Broker 1 is the leader of the partition:

{noformat}
[2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4)
 for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata 
request sent by controller 1 epoch 414 with correlation id 876 
(state.change.logger)
{noformat}

but soon after it fails to release leadership to broker 4:

{noformat}
[2016-04-09 00:40:58,139] TRACE Broker 1 handling LeaderAndIsr request 
correlationId 0 from controller 5 epoch 416 starting the become-follower 
transition for partition [tec1.en2.frontend.syncPing,7] (state.change.logger)

[2016-04-09 00:40:58,144] ERROR Broker 1 received LeaderAndIsrRequest with 
correlation id 0 from controller 5 epoch 415 for partition 
[tec1.en2.frontend.syncPing,7] but cannot become follower since the new leader 
4 is unavailable. (state.change.logger)
{noformat}

Now, a bit later in the log, the broker says that it is caching the leader info 
for the partition:

{noformat}
[2016-04-09 00:42:02,456] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415),ReplicationFactor:3),AllReplicas:1,2,4)
 for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata 
request sent by controller 5 epoch 416 with correlation id 1473 
(state.change.logger)
{noformat}

but it keeps printing the “Cached zkVersion…” errors, which indicate that the 
broker still believes it is the leader of the partition, or at least the 
variable {{leaderReplicaIdOpt}} is set this way.

I also inspected other partitions, and the behavior doesn’t seem to be 
consistent. I’ve seen at least one partition in broker 2 for which the broker 
made the appropriate transition:

{noformat}
[2016-04-09 00:39:23,840] TRACE Broker 2 received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:3,ISR:2,3,LeaderEpoch:305,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:3,2,4)
 correlation id 535 from controller 1 epoch 414 for partition 
[tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,841] TRACE Broker 2 handling LeaderAndIsr request 
correlationId 535 from controller 1 epoch 414 starting the become-follower 
transition for partition [tec1.ono_qe1.bodydata.recordings,20] 
(state.change.logger)
[2016-04-09 00:39:23,841] TRACE Broker 2 stopped fetchers as part of 
become-follower request from controller 1 epoch 414 with correlation id 535 for 
partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 truncated logs and checkpointed 
recovery boundaries for partition [tec1.ono_qe1.bodydata.recordings,20] as part 
of become-follower request with correlation id 535 from controller 1 epoch 414 
(state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 started fetcher to new leader as part 
of become-follower request from controller 1 epoch 414 with correlation id 535 
for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 completed LeaderAndIsr request 
correlationId 535 from controller 1 epoch 414 for the become-follower 
transition for partition [tec1.ono_qe1.bodydata.recordings,20] 
(state.change.logger)
{noformat}

Actually, the state-change log of broker 2 seems to have a gap starting at 
{{[2016-04-09 00:39:46,246]}}. Is it when you’ve restarted the broker? 


was (Author: fpj):
Thanks for the logs. I still don’t have an answer, but I wanted to report some 
progress.

I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 
was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are 
a few things that I’ve observed:

Broker 1 is the leader of the partition:

{noformat}
[2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4)
 for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata 
request sent by controller 1 epoch 414 with correlation id 876 
(state.change.logger)
{noformat}

but soon after it fails to release leadership to broker 4:

{noformat}
[2016-04-09 00:40:58,139] TRACE Broker 1 handling LeaderAndIsr request 
correlationId 0 from controller 5 epoch 416 starting the become-follower 
transition for partition 

[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15236055#comment-15236055
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

Thanks for the logs. I still don’t have an answer, but I wanted to report some 
progress.

I tracked one of the partitions that I’ve chosen arbitrarily for which broker 1 
was complaining about the zk version: [tec1.en2.frontend.syncPing,7]. Here are 
a few things that I’ve observed:

Broker 1 is the leader of the partition:

{noformat}
[2016-04-09 00:40:52,883] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:363,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:1,2,4)
 for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata 
request sent by controller 1 epoch 414 with correlation id 876 
(state.change.logger)
{noformat}

but soon after it fails to release leadership to broker 4:

{noformat}
[2016-04-09 00:40:58,139] TRACE Broker 1 handling LeaderAndIsr request 
correlationId 0 from controller 5 epoch 416 starting the become-follower 
transition for partition [tec1.en2.frontend.syncPing,7] (state.change.logger)

[2016-04-09 00:40:58,144] ERROR Broker 1 received LeaderAndIsrRequest with 
correlation id 0 from controller 5 epoch 415 for partition 
[tec1.en2.frontend.syncPing,7] but cannot become follower since the new leader 
4 is unavailable. (state.change.logger)
{no format}

Now, a bit later in the log, the broker says that it is caching the leader info 
for the partition:

{noformat}
[2016-04-09 00:42:02,456] TRACE Broker 1 cached leader info 
(LeaderAndIsrInfo:(Leader:4,ISR:4,LeaderEpoch:364,ControllerEpoch:415),ReplicationFactor:3),AllReplicas:1,2,4)
 for partition [tec1.en2.frontend.syncPing,7] in response to UpdateMetadata 
request sent by controller 5 epoch 416 with correlation id 1473 
(state.change.logger)
{noformat}

but it keeps printing the “Cached zkVersion…” errors, which indicate that the 
broker still believes it is the leader of the partition, or at least the 
variable {{leaderReplicaIdOpt}} is set this way.

I also inspected other partitions, and the behavior doesn’t seem to be 
consistent. I’ve seen at least one partition in broker 2 for which the broker 
made the appropriate transition:

{noformat}
[2016-04-09 00:39:23,840] TRACE Broker 2 received LeaderAndIsr request 
(LeaderAndIsrInfo:(Leader:3,ISR:2,3,LeaderEpoch:305,ControllerEpoch:414),ReplicationFactor:3),AllReplicas:3,2,4)
 correlation id 535 from controller 1 epoch 414 for partition 
[tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,841] TRACE Broker 2 handling LeaderAndIsr request 
correlationId 535 from controller 1 epoch 414 starting the become-follower 
transition for partition [tec1.ono_qe1.bodydata.recordings,20] 
(state.change.logger)
[2016-04-09 00:39:23,841] TRACE Broker 2 stopped fetchers as part of 
become-follower request from controller 1 epoch 414 with correlation id 535 for 
partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 truncated logs and checkpointed 
recovery boundaries for partition [tec1.ono_qe1.bodydata.recordings,20] as part 
of become-follower request with correlation id 535 from controller 1 epoch 414 
(state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 started fetcher to new leader as part 
of become-follower request from controller 1 epoch 414 with correlation id 535 
for partition [tec1.ono_qe1.bodydata.recordings,20] (state.change.logger)
[2016-04-09 00:39:23,856] TRACE Broker 2 completed LeaderAndIsr request 
correlationId 535 from controller 1 epoch 414 for the become-follower 
transition for partition [tec1.ono_qe1.bodydata.recordings,20] 
(state.change.logger)
{noformat}

Actually, the state-change log of broker 2 seems to have a gap starting at 
{{[2016-04-09 00:39:46,246]}}. Is it when you’ve restarted the broker? 

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-09 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15233564#comment-15233564
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

[~delbaeth] thanks for all the information.

bq.  Broker 1 rolled correctly and rejoined and leadership rebalancing 
occurred. After broker 2 rolled and came back up it now has an inconsistent 
view of the metadata. It thinks there are only 300 topics and all the other 
brokers believe there are 700. Should we file this as a separate issue?

I'm not aware of this issue, so it does sound better to report it in a 
different jira and describe the problem in as much detail as possible. If you 
have logs for this problem, then please share.

bq. We have managed to reproduce the problem and have a snapshot of the logs. 
The tarball is about a gigabyte. What should I do with it?

If you have a web server that can host it, then perhaps you can upload it 
there. A dropbox/box/onedrive public folder that we can read from would also do 
it.

bq. my zkCli.sh session which I was using to watch the controller exited here 
so I was disconnected for a minute

This is odd. Could you describe in more detail what happened with the zkCli 
session? You said that it disconnected for a minute, but has the session 
expired? It must have expired, unless your session timeout was at least one 
minute, which according to your description, it wasn't. If you have them, 
please include the zk logs in the bundle.



  

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-09 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232535#comment-15232535
 ] 

Flavio Junqueira edited comment on KAFKA-3042 at 4/9/16 1:09 PM:
-

Here is what I've been able to find out so far based on the logs that 
[~wushujames] posted: 

# The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. 
The {{updatedIsr}} in the logs seem to be called from 
{{Partition.maybeShrinkIsr}}.
# {{Partition.maybeShrinkIsr}} is called from 
{{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to 
schedule call in {{ReplicaManager.startup}}. This is the main reason we see 
those messages periodically coming up.
# In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader 
replica is the broker itself, which is determined by the variable 
{{leaderReplicaIdOpt}}.

It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is 
possible that it is due to a race with either the controllers or the execution 
of {{LeaderAndIsr}} requests.



was (Author: fpj):
Here is what I've been able to find out so far based on the logs that 
[~wushujames] posted: 

# The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. 
The {{updatedIsr}} in the logs seem to be called from 
{{Partition.maybeShrinkIsr}}.
# {{Partition.maybeShrinkIsr}} is called from 
{{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to 
schedule call in {{ReplicaManager.startup}}. This is the main reason we see 
those messages periodically coming up.
# In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader 
replica is the broker itself, which is determined by the variable 
{{leaderReplicaIdOpt}.

It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is 
possible that it is due to a race with either the controllers or the execution 
of {{LeaderAndIsr} requests.


> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-04-08 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15232535#comment-15232535
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

Here is what I've been able to find out so far based on the logs that 
[~wushujames] posted: 

# The "Cached zkVersion..." messages are printed in {{Partition.updateIsr}}. 
The {{updatedIsr}} in the logs seem to be called from 
{{Partition.maybeShrinkIsr}}.
# {{Partition.maybeShrinkIsr}} is called from 
{{ReplicaManager.maybeShrinkIsr}}, which is called periodically according to 
schedule call in {{ReplicaManager.startup}}. This is the main reason we see 
those messages periodically coming up.
# In {{Partition.maybeShrinkIsr}}, the ISR is only updated if the leader 
replica is the broker itself, which is determined by the variable 
{{leaderReplicaIdOpt}.

It looks like {{leaderReplicaIdOpt}} isn't being updated correctly, and it is 
possible that it is due to a race with either the controllers or the execution 
of {{LeaderAndIsr} requests.


> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
> Attachments: controller.log, server.log.2016-03-23-01, 
> state-change.log
>
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3524) ConnectorTaskId should also also include cluster id

2016-04-07 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3524:
---

 Summary: ConnectorTaskId should also also include cluster id
 Key: KAFKA-3524
 URL: https://issues.apache.org/jira/browse/KAFKA-3524
 Project: Kafka
  Issue Type: Improvement
  Components: copycat
Reporter: Flavio Junqueira
Assignee: Ewen Cheslack-Postava


ConnectorTaskId currently uses the connector name and the task number to give a 
unique id to a task. This id is unique within a connect cluster, but it might 
not be unique across clusters. If I want to uniquely identify connector tasks 
across clusters, then I need to also use the cluster name as part of the id. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3469) kafka-topics lock down znodes with user principal when zk security is enabled.

2016-04-01 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15222336#comment-15222336
 ] 

Flavio Junqueira commented on KAFKA-3469:
-

[~singhashish] right, you're assuming two different principals and the current 
implementation assumes that there is only one shared by all processes that 
access the znodes. During the discussion of KIP-38 we talked about enabling the 
addition of different principal so that we can support scenarios like the one 
you describe. Currently, that's not possible, but zk itself does support it. 
Perhaps this jira could be the implementation of that feature.

> kafka-topics lock down znodes with user principal when zk security is enabled.
> --
>
> Key: KAFKA-3469
> URL: https://issues.apache.org/jira/browse/KAFKA-3469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> In envs where ZK is kerberized, if a user, other than user running kafka 
> processes, creates a topic, ZkUtils will lock down corresponding znodes for 
> the user. Kafka will not be able to modify those znodes and that leaves the 
> topic unusable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3469) kafka-topics lock down znodes with user principal when zk security is enabled.

2016-04-01 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3469?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15221262#comment-15221262
 ] 

Flavio Junqueira commented on KAFKA-3469:
-

[~singhashish] I'm sorry, I missed the notification. Let me see if I understand 
this right:

bq. ZkUtils will lock down corresponding znodes for the user

You're saying that it sets the ACL with the wrong principal and consequently 
the brokers cannot use it? Currently, this is what we use:

{noformat}
list.addAll(ZooDefs.Ids.CREATOR_ALL_ACL)
list.addAll(ZooDefs.Ids.READ_ACL_UNSAFE)
{noformat}

For context, we did talk about having different credentials for admin tools 
when we released 0.9 if needed, so maybe we should do it, but let me try to 
understand the scenario a bit better first.

> kafka-topics lock down znodes with user principal when zk security is enabled.
> --
>
> Key: KAFKA-3469
> URL: https://issues.apache.org/jira/browse/KAFKA-3469
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ashish K Singh
>Assignee: Ashish K Singh
>
> In envs where ZK is kerberized, if a user, other than user running kafka 
> processes, creates a topic, ZkUtils will lock down corresponding znodes for 
> the user. Kafka will not be able to modify those znodes and that leaves the 
> topic unusable.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3205) Error in I/O with host (java.io.EOFException) raised in producer

2016-03-31 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3205?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15220339#comment-15220339
 ] 

Flavio Junqueira commented on KAFKA-3205:
-

[~bondj] I've commented on the pull request.

> Error in I/O with host (java.io.EOFException) raised in producer
> 
>
> Key: KAFKA-3205
> URL: https://issues.apache.org/jira/browse/KAFKA-3205
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 0.8.2.1, 0.9.0.0
>Reporter: Jonathan Raffre
> Fix For: 0.10.0.0
>
>
> In a situation with a Kafka broker in 0.9 and producers still in 0.8.2.x, 
> producers seems to raise the following after a variable amount of time since 
> start :
> {noformat}
> 2016-01-29 14:33:13,066 WARN [] o.a.k.c.n.Selector: Error in I/O with 
> 172.22.2.170
> java.io.EOFException: null
> at 
> org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62)
>  ~[org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at org.apache.kafka.common.network.Selector.poll(Selector.java:248) 
> ~[org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at 
> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) 
> [org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) 
> [org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at 
> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) 
> [org.apache.kafka.kafka-clients-0.8.2.0.jar:na]
> at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66-internal]
> {noformat}
> This can be reproduced successfully by doing the following :
>  * Start a 0.8.2 producer connected to the 0.9 broker
>  * Wait 15 minutes, exactly
>  * See the error in the producer logs.
> Oddly, this also shows up in an active producer but after 10 minutes of 
> activity.
> Kafka's server.properties :
> {noformat}
> broker.id=1
> listeners=PLAINTEXT://:9092
> port=9092
> num.network.threads=2
> num.io.threads=2
> socket.send.buffer.bytes=1048576
> socket.receive.buffer.bytes=1048576
> socket.request.max.bytes=104857600
> log.dirs=/mnt/data/kafka
> num.partitions=4
> auto.create.topics.enable=false
> delete.topic.enable=true
> num.recovery.threads.per.data.dir=1
> log.retention.hours=48
> log.retention.bytes=524288000
> log.segment.bytes=52428800
> log.retention.check.interval.ms=6
> log.roll.hours=24
> log.cleanup.policy=delete
> log.cleaner.enable=true
> zookeeper.connect=127.0.0.1:2181
> zookeeper.connection.timeout.ms=100
> {noformat}
> Producer's configuration :
> {noformat}
>   compression.type = none
>   metric.reporters = []
>   metadata.max.age.ms = 30
>   metadata.fetch.timeout.ms = 6
>   acks = all
>   batch.size = 16384
>   reconnect.backoff.ms = 10
>   bootstrap.servers = [127.0.0.1:9092]
>   receive.buffer.bytes = 32768
>   retry.backoff.ms = 500
>   buffer.memory = 33554432
>   timeout.ms = 3
>   key.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>   retries = 3
>   max.request.size = 500
>   block.on.buffer.full = true
>   value.serializer = class 
> org.apache.kafka.common.serialization.StringSerializer
>   metrics.sample.window.ms = 3
>   send.buffer.bytes = 131072
>   max.in.flight.requests.per.connection = 5
>   metrics.num.samples = 2
>   linger.ms = 0
>   client.id = 
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils

2016-03-22 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15206935#comment-15206935
 ] 

Flavio Junqueira commented on KAFKA-3210:
-

[~granthenke] my main goal here is to have access to the async API and have 
more control over what happens between sessions. I'm more used to programming 
against zookeeper directly, so I'm more inclined to pursue that direction. 
Also, given how ZkUtils is structured, it is not entirely clear to me how much 
we would be able to benefit from the recipes that curator can offer. Having 
said that, I don't have any major concern with using a different wrapper if 
this community prefers that option as long as we are able to make use of 
asynchronous calls and have more control over session creation.

> Using asynchronous calls through the raw ZK API in ZkUtils
> --
>
> Key: KAFKA-3210
> URL: https://issues.apache.org/jira/browse/KAFKA-3210
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, zkclient
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>
> We have observed a number of issues with the controller interaction with 
> ZooKeeper mainly because ZkClient creates new sessions transparently under 
> the hood. Creating sessions transparently enables, for example, old 
> controller to successfully update znodes in ZooKeeper even when they aren't 
> the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass 
> the ZkClient lib like we did with ZKWatchedEphemeral.
> In addition to fixing such races with the controller, it would improve 
> performance significantly if we used the async API (see KAFKA-3038). The 
> async API is more efficient because it pipelines the requests to ZooKeeper, 
> and the number of requests upon controller recovery can be large.
> This jira proposes to make these two changes to the calls in ZkUtils and to 
> do it, one path is to first replace the calls in ZkUtils with raw async ZK 
> calls and block so that we don't have to change the controller code in this 
> phase. Once this step is accomplished and it is stable, we make changes to 
> the controller to handle the asynchronous calls to ZooKeeper.
> Note that in the first step, we will need to introduce some new logic for 
> session management, which is currently handled entirely by ZkClient. We will 
> also need to implement the subscription mechanism for event notifications 
> (see ZooKeeperLeaderElector as a an exemple).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3403) Upgrade ZkClient to 0.8

2016-03-19 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198172#comment-15198172
 ] 

Flavio Junqueira commented on KAFKA-3403:
-

[~granthenke] zkclient 0.8 is already available. are you going to do a pull 
request to upgrade it?

> Upgrade ZkClient to 0.8
> ---
>
> Key: KAFKA-3403
> URL: https://issues.apache.org/jira/browse/KAFKA-3403
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Critical
>
> KAFKA-3328 requires a versioned delete that [~fpj] added to ZkClient and will 
> be available in the 0.8 release. When released, we should upgrade to zkclient 
> 0.8. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3403) Upgrade ZkClient to 0.8

2016-03-19 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15198173#comment-15198173
 ] 

Flavio Junqueira commented on KAFKA-3403:
-

http://mvnrepository.com/artifact/com.101tec/zkclient/0.8

> Upgrade ZkClient to 0.8
> ---
>
> Key: KAFKA-3403
> URL: https://issues.apache.org/jira/browse/KAFKA-3403
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Grant Henke
>Assignee: Grant Henke
>Priority: Critical
>
> KAFKA-3328 requires a versioned delete that [~fpj] added to ZkClient and will 
> be available in the 0.8 release. When released, we should upgrade to zkclient 
> 0.8. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794
 ] 

Flavio Junqueira edited comment on KAFKA-3215 at 3/11/16 4:34 PM:
--

[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand it, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, the broker is messed up because the replica state machine 
isn't properly initialized. Also, the broker won't give up leadership because 
the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?


was (Author: fpj):
[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand this, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, broker S2 is messed up because the replica state machine 
isn't properly initialized. Also, the broker won't give up leadership because 
the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Flavio Junqueira
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794
 ] 

Flavio Junqueira edited comment on KAFKA-3215 at 3/11/16 1:21 PM:
--

[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand this, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, broker S2 is messed up because the replica state machine 
isn't properly initialized. Also, the broker won't give up leadership because 
the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?


was (Author: fpj):
[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand this, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, broker S2 is messed up because it thinks the replica 
state machine isn't properly initialized. Also, the broker won't give up 
leadership because the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Flavio Junqueira
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira reassigned KAFKA-3215:
---

Assignee: Flavio Junqueira

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>Assignee: Flavio Junqueira
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3215) controller may not be started when there are multiple ZK session expirations

2016-03-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15190794#comment-15190794
 ] 

Flavio Junqueira commented on KAFKA-3215:
-

[~junrao] Let me see if I understand this issue correctly.

bq. broker 1 is the controller and it has 2 consecutive ZK session expirations

As I understand this, one possible run that reflects this is the following:

# zkclient creates a session S1
# S1 session expires
# zkclient queues the session expiration event to deliver to the kafka broker
# zkclient creates a new session S2
# S2 expires
# zkclient queues the session expiration for S2 and the event for S1 still 
hasn't been delivered
# zkclient creates a third session S3
# broker 1 processes the session expiration of S1
# broker 1 successfully elects itself leader/controller in session S3
# broker 1 processes session expiration for S2

After this last step, broker S2 is messed up because it thinks the replica 
state machine isn't properly initialized. Also, the broker won't give up 
leadership because the ephemeral has been created in the current session.

I think this was a problem in 0.8.2, but not a problem in 0.9 because we fixed 
it in KAFKA-1387. With ZKWatchedEphemeral, in the case we get that the znode 
exists while creating it, we check if the existing znode has the same session 
owner, in which case the operation returns ok and the controller becomes 
leader. Does it make sense?

> controller may not be started when there are multiple ZK session expirations
> 
>
> Key: KAFKA-3215
> URL: https://issues.apache.org/jira/browse/KAFKA-3215
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Jun Rao
>  Labels: controller
>
> Suppose that broker 1 is the controller and it has 2 consecutive ZK session 
> expirations. In this case, two ZK session expiration events will be fired.
> 1. When handling the first ZK session expiration event, 
> SessionExpirationListener.handleNewSession() can elect broker 1 itself as the 
> new controller and initialize the states properly.
> 2. When handling the second ZK session expiration event, 
> SessionExpirationListener.handleNewSession() first calls 
> onControllerResignation(), which will set ReplicaStateMachine.hasStarted to 
> false. It then continues to do controller election in 
> ZookeeperLeaderElector.elect() and try to create the controller node in ZK. 
> This will fail since broker 1 has already registered itself as the controller 
> node in ZK. In this case, we simply ignore the failure to create the 
> controller node since we assume the controller must be in another broker. 
> However, in this case, the controller is broker 1 itself, but the 
> ReplicaStateMachine.hasStarted is still false.
> 3. Now, if a new broker event is fired, we will be ignoring the event in 
> BrokerChangeListener.handleChildChange since ReplicaStateMachine.hasStarted 
> is false. Now, we are in a situation that a controller is alive, but won't 
> react to any broker change event.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-03-10 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3173:

Attachment: KAFKA-3173-race-repro.patch

> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Critical
> Fix For: 0.10.0.0
>
> Attachments: KAFKA-3173-race-repro.patch
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. 
> Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lostat 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
> at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
> at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
> at 
> org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-03-10 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3173:

Attachment: (was: KAFKA-3173-race-repo.patch)

> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. 
> Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lostat 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
> at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
> at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
> at 
> org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-03-10 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3173:

Attachment: KAFKA-3173-race-repo.patch

> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. 
> Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lostat 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
> at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
> at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
> at 
> org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-03-10 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15189024#comment-15189024
 ] 

Flavio Junqueira commented on KAFKA-3173:
-

I have investigated the two races further and the first one is there but turns 
out to be harmless because we check {{hasStarted}} before adding any message to 
the batch. Consequently, the batch is not left dirty. We should still fix to 
avoid the ugly exception, but it is less critical.

The second race is a real problem. I have been able to reproduce it and it can 
cause either the startup to fail or the zk listener event to be skipped. Here 
is an output from the repro:

{noformat}
[2016-03-10 09:58:21,257] ERROR [Partition state machine on Controller 0]:  
(kafka.controller.PartitionStateMachine:100)
java.lang.Exception
at 
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$3.apply(PartitionStateMachine.scala:158)
at 
kafka.controller.PartitionStateMachine$$anonfun$handleStateChanges$3.apply(PartitionStateMachine.scala:158)
at kafka.utils.Logging$class.error(Logging.scala:100)
at 
kafka.controller.PartitionStateMachine.error(PartitionStateMachine.scala:44)
at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:158)
at 
kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:518)
at 
kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:505)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:455)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:437)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:437)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:255)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:436)
at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
[2016-03-10 09:58:21,447] ERROR [Partition state machine on Controller 0]: 
Error while moving some partitions to the online state 
(kafka.controller.PartitionStateMachine:103)
java.lang.IllegalStateException: Controller to broker state change requests 
batch is not empty while creating a new one. Some LeaderAndIsr state changes 
Map(1 -> Map(topic1-0 -> 
(LeaderAndIsrInfo:(Leader:1,ISR:1,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:1)))
 might be lost 
at 
kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
at 
kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:126)
at 
kafka.controller.PartitionStateMachine.startup(PartitionStateMachine.scala:71)
at 
kafka.controller.ControllerFailoverTest.testStartupRace(ControllerFailoverTest.scala:119)
{noformat}

The first exception is induced, just to know what call is concurrent with the 
call to startup. The second exception is due to the batch being dirty when I 
call startup on {{partitionStateMachine}}. It can happen the other way around 
too and the topic update can fail. Wrapping the call to 
{{triggerOnlinePartitionStateChange}} with the controller lock solves the issue.

Unfortunately, I had to instrument the code to trigger the race. It is hard to 
test these cases without being invasive, so I'm inclined to not add test cases 
for this. I'll post the changes I have used to repro the two issues I've 
mentioned. Note that they are test cases, but they don't actually fail because 
the current code catches the illegal state exception.  

> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change 

[jira] [Commented] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-03-09 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15187230#comment-15187230
 ] 

Flavio Junqueira commented on KAFKA-3173:
-

I actually can't reorder it like that, it breaks the controller if I do it, and 
in fact, the race condition as I described above can't happen because the 
listener callbacks check whether the corresponding state machines have started 
or not.

There are two other possible races I found, though.

1- In {{onControllerFailover}}, we have the following:
{noformat}
  replicaStateMachine.startup()
  partitionStateMachine.startup()
{noformat}

By invoking {{replicaStateMachine.startup()}} first, we set {{hasStarted}} to 
true. If there is a broker change, then the listener callback will be executed 
({{ReplicaStateMahcine.hasSarted}} is true), which will eventually invoke 
{{partitionStateMachine.triggerOnlinePartitionStateChange()}}. If 
{{partitionStateMachine.startup()}} hasn't started yet, then 
{{partitionStateMachine.hasStarted}} is false and we will end up with a dirty 
batch. We need to swap those calls. I also checked that there is no flow the 
other way around (partitionStateMachine -> replicaStateMachine), so it is safe 
to swap the calls.

2- If a zookeeper event is triggered while either state machine is starting up, 
then it is possible that either the startup call or the handling of the 
zookeeper event will find a dirty batch because of concurrent execution. In 
this case, we don't leave a dirty batch because it clears after the execution, 
but one of the calls might fail and we don't have code to recover.

We are currently locking many of these calls with a controller lock to prevent 
races, but apparently we forgot to add it to the startup calls, like this:
{noformat}
inLock(controllerContext.controllerLock) {
  triggerOnlinePartitionStateChange()
}
{noformat}

I verified that this doesn't break anything, but I want to write a test case to 
see if I can repro the problem. 

> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. 
> Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lostat 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
> at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
> at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
> at 
> org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-03-09 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15186928#comment-15186928
 ] 

Flavio Junqueira commented on KAFKA-3173:
-

There is a race in the controller failover that can be causing this. Here is 
what we have in {{KafkaController.oncControllerFailover}}:

{noformat}
  partitionStateMachine.registerListeners()
  replicaStateMachine.registerListeners()
  initializeControllerContext()
  replicaStateMachine.startup()
  partitionStateMachine.startup()
{noformat}

Both partition state machine and replica state machine are registering zk 
listeners before they startup. In partition state machine, the two calls that 
invoke {{ControllerBrokerRequestBatch.newBatch()}} are 
{{triggerOnlinePartitionStateChange}} and {{handleStateChanges}}. They both 
invoke a private method that check {{hasStarted}} and throw an exception if it 
hasn't started. The variable {{hasStarted}} is set to true upon executing 
{{startup}}. Consequently, if we register the listener before starting the 
state machine and an event comes through, then what will happen is that we will 
leave the batch dirty, which will cause the exception in the description.

In both {{triggerOnlinePartitionStateChange}} and {{handleStateChanges}}, we 
simply log the error and move on leaving the batch dirty. Note that we have the 
same issue in replica state machine.

I believe the right execution order in {{onControllerFailover}} should be:

{noformat}
  partitionStateMachine.startup()
  replicaStateMachine.startup()
  partitionStateMachine.registerListeners()
  replicaStateMachine.registerListeners()
  initializeControllerContext()
{noformat}

There could be other sources, but right now this looks like a clear one to me. 
Also, something that looks really bad in lots of place in the controller code 
is that if there is some error processing partition or replica changes, then 
the code simply logs and moves on. Instead, I believe it should either recover 
from the error or resigning as controller, but we can't simply skip an update. 
This should be addressed when rewriting the controller, though.

As for this jira, I suggest we fix this race and revisit it in the case it 
reappears.


> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
>Priority: Critical
> Fix For: 0.10.0.0
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. 
> Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
> (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lostat 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
> at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
> at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
> at 
> org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3305) JMX in AWS instance

2016-02-29 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3305:
---

 Summary: JMX in AWS instance
 Key: KAFKA-3305
 URL: https://issues.apache.org/jira/browse/KAFKA-3305
 Project: Kafka
  Issue Type: Improvement
 Environment: AWS
Reporter: Flavio Junqueira


While connecting JConsole to a broker running on AWS, I've needed to set the 
following properties to get it to work:

{noformat}
-Djava.net.preferIPv4Stack=true 
-Djava.rmi.server.hostname=xxx.compute.amazonaws.com 
-Dcom.sun.management.jmxremote.rmi.port=9996
{noformat}

in addition to setting the JMX_PORT variable. I suggest we at least document 
these options and possibly have them in {{kafka-run-class.sh}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3042) updateIsr should stop after failed several times due to zkVersion issue

2016-02-23 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3042?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158807#comment-15158807
 ] 

Flavio Junqueira commented on KAFKA-3042:
-

bq. I think this is because the broker consider itself as the leader in fact 
it's a follower. So after several failed tries, it need to find out who is the 
leader.

But in this case, wouldn't the broker eventually recover? What I find 
problematic here is that the descriptions and comments in the related jiras 
mention the need of bouncing brokers, it shouldn't be necessary.

If anyone has state change logs to share for the period in which one of these 
incidents occurred, it'd be nice to see them.

> updateIsr should stop after failed several times due to zkVersion issue
> ---
>
> Key: KAFKA-3042
> URL: https://issues.apache.org/jira/browse/KAFKA-3042
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.8.2.1
> Environment: jdk 1.7
> centos 6.4
>Reporter: Jiahongchao
>
> sometimes one broker may repeatly log
> "Cached zkVersion 54 not equal to that in zookeeper, skip updating ISR"
> I think this is because the broker consider itself as the leader in fact it's 
> a follower.
> So after several failed tries, it need to find out who is the leader



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-1382) Update zkVersion on partition state update failures

2016-02-23 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-1382?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15158799#comment-15158799
 ] 

Flavio Junqueira commented on KAFKA-1382:
-

hi there, it isn't clear to me if you've hit this bug or if your broker simply 
got stale versions. even with the fix proposed here, you can still get 
conflicting versions, which will lead to those messages. was your cluster able 
to recover after you've seen those messages? what's the precise behavior you 
have observed?

> Update zkVersion on partition state update failures
> ---
>
> Key: KAFKA-1382
> URL: https://issues.apache.org/jira/browse/KAFKA-1382
> Project: Kafka
>  Issue Type: Bug
>Reporter: Joel Koshy
>Assignee: Sriharsha Chintalapani
> Fix For: 0.8.1.2, 0.8.2.0
>
> Attachments: KAFKA-1382.patch, KAFKA-1382_2014-05-30_21:19:21.patch, 
> KAFKA-1382_2014-05-31_15:50:25.patch, KAFKA-1382_2014-06-04_12:30:40.patch, 
> KAFKA-1382_2014-06-07_09:00:56.patch, KAFKA-1382_2014-06-09_18:23:42.patch, 
> KAFKA-1382_2014-06-11_09:37:22.patch, KAFKA-1382_2014-06-16_13:50:16.patch, 
> KAFKA-1382_2014-06-16_14:19:27.patch
>
>
> Our updateIsr code is currently:
>   private def updateIsr(newIsr: Set[Replica]) {
> debug("Updated ISR for partition [%s,%d] to %s".format(topic, 
> partitionId, newIsr.mkString(",")))
> val newLeaderAndIsr = new LeaderAndIsr(localBrokerId, leaderEpoch, 
> newIsr.map(r => r.brokerId).toList, zkVersion)
> // use the epoch of the controller that made the leadership decision, 
> instead of the current controller epoch
> val (updateSucceeded, newVersion) = 
> ZkUtils.conditionalUpdatePersistentPath(zkClient,
>   ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId),
>   ZkUtils.leaderAndIsrZkData(newLeaderAndIsr, controllerEpoch), zkVersion)
> if (updateSucceeded){
>   inSyncReplicas = newIsr
>   zkVersion = newVersion
>   trace("ISR updated to [%s] and zkVersion updated to 
> [%d]".format(newIsr.mkString(","), zkVersion))
> } else {
>   info("Cached zkVersion [%d] not equal to that in zookeeper, skip 
> updating ISR".format(zkVersion))
> }
> We encountered an interesting scenario recently when a large producer fully
> saturated the broker's NIC for over an hour. The large volume of data led to
> a number of ISR shrinks (and subsequent expands). The NIC saturation
> affected the zookeeper client heartbeats and led to a session timeout. The
> timeline was roughly as follows:
> - Attempt to expand ISR
> - Expansion written to zookeeper (confirmed in zookeeper transaction logs)
> - Session timeout after around 13 seconds (the configured timeout is 20
>   seconds) so that lines up.
> - zkclient reconnects to zookeeper (with the same session ID) and retries
>   the write - but uses the old zkVersion. This fails because the zkVersion
>   has already been updated (above).
> - The ISR expand keeps failing after that and the only way to get out of it
>   is to bounce the broker.
> In the above code, if the zkVersion is different we should probably update
> the cached version and even retry the expansion until it succeeds.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3167) Use local to the workspace Gradle cache and recreate it on every build

2016-02-11 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15142761#comment-15142761
 ] 

Flavio Junqueira commented on KAFKA-3167:
-

we currently have this:

{noformat}

$GRADLE_2_4_RC_2_HOME/bin/gradle

{noformat}

should I replace it with:

{noformat}
export GRADLE_USER_HOME=${WORKSPACE}/${JOB_NAME}/gradle
$GRADLE_USER_HOME/bin/gradle
{noformat}



> Use local to the workspace Gradle cache and recreate it on every build
> --
>
> Key: KAFKA-3167
> URL: https://issues.apache.org/jira/browse/KAFKA-3167
> Project: Kafka
>  Issue Type: Improvement
>  Components: build
>Reporter: Ismael Juma
>
> Kafka builds often fail with "Could not add entry 
> '/home/jenkins/.gradle/caches/modules-2/files-2.1/net.jpountz.lz4/lz4/1.3.0/c708bb2590c0652a642236ef45d9f99ff842a2ce/lz4-1.3.0.jar'
>  to cache fileHashes.bin"
> I filed INFRA-11083 and Andrew Bayer suggested:
> "Can you change your builds to use a local-to-the-workspace cache and then 
> nuke it/recreate it on every build?"
> This issue is about changing the Jenkins config for one of the trunk builds 
> to do the above to see if it helps.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils

2016-02-07 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3210:

Description: 
We have observed a number of issues with the controller interaction with 
ZooKeeper mainly because ZkClient creates new sessions transparently under the 
hood. Creating sessions transparently enables, for example, old controller to 
successfully update znodes in ZooKeeper even when they aren't the controller 
any longer (e.g., KAFKA-3083). To fix this, we need to bypass the ZkClient lib 
like we did with ZKWatchedEphemeral.

In addition to fixing such races with the controller, it would improve 
performance significantly if we used the async API (see KAFKA-3038). The async 
API is more efficient because it pipelines the requests to ZooKeeper, and the 
number of requests upon controller recovery can be large.

This jira proposes to make these two changes to the calls in ZkUtils and to do 
it, one path is to first replace the calls in ZkUtils with raw async ZK calls 
and block so that we don't have to change the controller code in this phase. 
Once this step is accomplished and it is stable, we make changes to the 
controller to handle the asynchronous calls to ZooKeeper.

Note that in the first step, we will need to introduce some new logic for 
session management, which is currently handled entirely by ZkClient. We will 
also need to implement the subscription mechanism for event notifications (see 
ZooKeeperLeaderElector as a an exemple).  

  was:
We have observed a number of issues with the controller interaction with 
ZooKeeper mainly because ZkClient creates new sessions transparently under the 
hood. Creating sessions transparently enables, for example, old controller to 
successfully update znodes in ZooKeeper even when they aren't the controller 
any longer (e.g., KAFKA-3083). To fix this, we need to bypass the ZkClient lib 
like we did with ZKWatchedEphemeral.

In addition to fixing such races with the controller, it would improve 
performance significantly if we used the async API (see KAFKA-3038). The async 
API is more efficient because it pipelines the requests to ZooKeeper, and the 
number of requests upon controller recovery can be large.

This jira proposes to make these two changes to the calls in ZkUtils and to do 
it, one path is to first replace the calls in ZkUtils with raw async ZK calls 
and block so that we don't have to change the controller code in this phase. 
Once this step is accomplished and it is stable, we make changes to the 
controller to handle the asynchronous calls to ZooKeeper.

Note that in the first step, we will need to introduce some logic for session 
management, which is currently handled entirely by ZkClient.  


> Using asynchronous calls through the raw ZK API in ZkUtils
> --
>
> Key: KAFKA-3210
> URL: https://issues.apache.org/jira/browse/KAFKA-3210
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, zkclient
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>
> We have observed a number of issues with the controller interaction with 
> ZooKeeper mainly because ZkClient creates new sessions transparently under 
> the hood. Creating sessions transparently enables, for example, old 
> controller to successfully update znodes in ZooKeeper even when they aren't 
> the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass 
> the ZkClient lib like we did with ZKWatchedEphemeral.
> In addition to fixing such races with the controller, it would improve 
> performance significantly if we used the async API (see KAFKA-3038). The 
> async API is more efficient because it pipelines the requests to ZooKeeper, 
> and the number of requests upon controller recovery can be large.
> This jira proposes to make these two changes to the calls in ZkUtils and to 
> do it, one path is to first replace the calls in ZkUtils with raw async ZK 
> calls and block so that we don't have to change the controller code in this 
> phase. Once this step is accomplished and it is stable, we make changes to 
> the controller to handle the asynchronous calls to ZooKeeper.
> Note that in the first step, we will need to introduce some new logic for 
> session management, which is currently handled entirely by ZkClient. We will 
> also need to implement the subscription mechanism for event notifications 
> (see ZooKeeperLeaderElector as a an exemple).  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2818) Clean up Controller Object on forced Resignation

2016-02-06 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-2818:

Assignee: Flavio Junqueira  (was: Neha Narkhede)
  Status: Open  (was: Patch Available)

> Clean up Controller Object on forced Resignation
> 
>
> Key: KAFKA-2818
> URL: https://issues.apache.org/jira/browse/KAFKA-2818
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0
>Reporter: Matthew Bruce
>Assignee: Flavio Junqueira
>Priority: Minor
> Attachments: KAFKA-2818.patch
>
>
> Currently if the controller does a forced resignation (if an exception is 
> caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or 
> shutdownBroker), the Zookeeper resignation callback function 
> OnControllerResignation doesn't get a chance to execute which leaves some 
> artifacts laying around.  In particular the Sensors dont get cleaned up and 
> if the Kafka broker ever gets re-elected as Controller it will fail due to 
> some metrics already existing.  An Error and stack trace of such an event is 
> below.
> A forced resignation situation can be induced with a mis-config in 
> broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and 
> setting inter.broker.protocol.version=0.8.2.X
> {code}
> listeners=SASL_PLAINTEXT://:9092
> inter.broker.protocol.version=0.8.2.X
> security.inter.broker.protocol=SASL_PLAINTEXT
> {code}
> {code}
> [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on 
> broker 182050300 (kafka.server.ZookeeperLeaderElector)
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, group=controller-channel-metrics, 
> description=Connections closed per second in the window., 
> tags={broker-id=182050300}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:578)
> at org.apache.kafka.common.network.Selector.(Selector.java:112)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:43)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2818) Clean up Controller Object on forced Resignation

2016-02-06 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15136147#comment-15136147
 ] 

Flavio Junqueira commented on KAFKA-2818:
-

[~mbr...@blackberry.com] You're right that {{onControllerResignation()}} isn't 
being called, but the problem is that 
{{ZooKeeperLeaderElector.handleDataDeleted()}} should be invoking it via 
{{onResigningAsLeader()}} and it isn't because {{amILeader}} is evaluating to 
false. It is evaluating to false because the call to {{resign()}} is setting 
{{leaderId}} to -1. I'm thinking that we shouldn't set {{leaderId}} to -1 in 
the {{resign()}} call. {{leaderId}} to will be set to -1 if {{getControllerID}} 
returns -1 when called inside {{elect}}.

What do you think?

> Clean up Controller Object on forced Resignation
> 
>
> Key: KAFKA-2818
> URL: https://issues.apache.org/jira/browse/KAFKA-2818
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 0.9.0.0
>Reporter: Matthew Bruce
>Assignee: Neha Narkhede
>Priority: Minor
> Attachments: KAFKA-2818.patch
>
>
> Currently if the controller does a forced resignation (if an exception is 
> caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or 
> shutdownBroker), the Zookeeper resignation callback function 
> OnControllerResignation doesn't get a chance to execute which leaves some 
> artifacts laying around.  In particular the Sensors dont get cleaned up and 
> if the Kafka broker ever gets re-elected as Controller it will fail due to 
> some metrics already existing.  An Error and stack trace of such an event is 
> below.
> A forced resignation situation can be induced with a mis-config in 
> broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and 
> setting inter.broker.protocol.version=0.8.2.X
> {code}
> listeners=SASL_PLAINTEXT://:9092
> inter.broker.protocol.version=0.8.2.X
> security.inter.broker.protocol=SASL_PLAINTEXT
> {code}
> {code}
> [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on 
> broker 182050300 (kafka.server.ZookeeperLeaderElector)
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, group=controller-channel-metrics, 
> description=Connections closed per second in the window., 
> tags={broker-id=182050300}]' already exists, can't register another one.
> at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
> at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
> at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.(Selector.java:578)
> at org.apache.kafka.common.network.Selector.(Selector.java:112)
> at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
> at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
> at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
> at 
> kafka.controller.ControllerChannelManager.(ControllerChannelManager.scala:43)
> at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819)
> at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747)
> at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330)
> at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163)
> at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
> at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
> at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3197) Producer can send message out of order even when in flight request is set to 1.

2016-02-04 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15133265#comment-15133265
 ] 

Flavio Junqueira commented on KAFKA-3197:
-

[~becket_qin] thanks for the clarification. 

bq. If leader moves, that does not necessarily mean the request to old leader 
failed. We can always send the unacknowledged message to new leader, but that 
probably introduce duplicate almost every time leader moves.

I agree that duplicates are inconvenient, but in this scenario we aren't 
promising no duplicates, so I'd rather treat the duplicates separately.

bq. Currently after batches leave the record accumulator, we only track them in 
requests.

The record accumulator point is a good one and I'm not super familiar with that 
part of the code, so I don't have any concrete suggestion right now, but I'll 
have a closer look. However, 

bq. So while the idea of resend unacknowledged message to both old and new 
leader is natural and makes sense, it seems much more complicated and error 
prone based on our current implementation and does not buy us much.

True, from your description, it sounds like the change isn't trivial. But let 
me ask you this: don't we ever have to retransmit messages after a leader 
change? If we do, then the code path for retransmitting on a different 
connection must be there. I'm not super familiar with that part of the code, so 
I don't have any concrete suggestion right now, but I can have a look to see if 
I'm able to help out.



> Producer can send message out of order even when in flight request is set to 
> 1.
> ---
>
> Key: KAFKA-3197
> URL: https://issues.apache.org/jira/browse/KAFKA-3197
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.9.0.1
>
>
> The issue we saw is following:
> 1. Producer send message 0 to topic-partition-0 on broker A. The in-flight 
> request to broker A is 1.
> 2. The request is somehow lost
> 3. Producer refreshed its topic metadata and found leader of 
> topic-partition-0 migrated from broker A to broker B.
> 4. Because there is no in-flight request to broker B. All the subsequent 
> messages to topic-partition-0 in the record accumulator are sent to broker B.
> 5. Later on when the request in step (1) times out, message 0 will be retried 
> and sent to broker B. At this point, all the later messages has already been 
> sent, so we have re-order.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3197) Producer can send message out of order even when in flight request is set to 1.

2016-02-04 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3197?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15132571#comment-15132571
 ] 

Flavio Junqueira commented on KAFKA-3197:
-

Treating it as a bug sounds right. In the example given in the description, 
when the producer connects to broker B, shouldn't it resend unacknowledged 
messages (0 in the example) over the new connection (to broker B in the 
example)? It can produce duplicates as has been pointed out, but eliminating 
duplicates is a separate matter.

> Producer can send message out of order even when in flight request is set to 
> 1.
> ---
>
> Key: KAFKA-3197
> URL: https://issues.apache.org/jira/browse/KAFKA-3197
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 0.9.0.0
>Reporter: Jiangjie Qin
>Assignee: Jiangjie Qin
> Fix For: 0.9.0.1
>
>
> The issue we saw is following:
> 1. Producer send message 0 to topic-partition-0 on broker A. The in-flight 
> request to broker A is 1.
> 2. The request is somehow lost
> 3. Producer refreshed its topic metadata and found leader of 
> topic-partition-0 migrated from broker A to broker B.
> 4. Because there is no in-flight request to broker B. All the subsequent 
> messages to topic-partition-0 in the record accumulator are sent to broker B.
> 5. Later on when the request in step (1) times out, message 0 will be retried 
> and sent to broker B. At this point, all the later messages has already been 
> sent, so we have re-order.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils

2016-02-04 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3210:
---

 Summary: Using asynchronous calls through the raw ZK API in ZkUtils
 Key: KAFKA-3210
 URL: https://issues.apache.org/jira/browse/KAFKA-3210
 Project: Kafka
  Issue Type: Improvement
  Components: controller, zkclient
Affects Versions: 0.9.0.0
Reporter: Flavio Junqueira
Assignee: Neha Narkhede


We have observed a number of issues with the controller interaction with 
ZooKeeper mainly because ZkClient creates new sessions transparently under the 
hood. Creating sessions transparently enables, for example, old controller to 
successfully update znodes in ZooKeeper even when they aren't the controller 
any longer (e.g., KAFKA-3083). To fix this, we need to bypass the ZkClient lib 
like we did with ZKWatchedEphemeral.

In addition to fixing such races with the controller, it would improve 
performance significantly if we used the async API (see KAFKA-3038). The async 
API is more efficient because it pipelines the requests to ZooKeeper, and the 
number of requests upon controller recovery can be large.

This jira proposes to make these two changes to the calls in ZkUtils and to do 
it, one path is to first replace the calls in ZkUtils with raw async ZK calls 
and block so that we don't have to change the controller code in this phase. 
Once this step is accomplished and it is stable, we make changes to the 
controller to handle the asynchronous calls to ZooKeeper.

Note that in the first step, we will need to introduce some logic for session 
management, which is currently handled entirely by ZkClient.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3210) Using asynchronous calls through the raw ZK API in ZkUtils

2016-02-04 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3210:

Assignee: (was: Neha Narkhede)

> Using asynchronous calls through the raw ZK API in ZkUtils
> --
>
> Key: KAFKA-3210
> URL: https://issues.apache.org/jira/browse/KAFKA-3210
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, zkclient
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>
> We have observed a number of issues with the controller interaction with 
> ZooKeeper mainly because ZkClient creates new sessions transparently under 
> the hood. Creating sessions transparently enables, for example, old 
> controller to successfully update znodes in ZooKeeper even when they aren't 
> the controller any longer (e.g., KAFKA-3083). To fix this, we need to bypass 
> the ZkClient lib like we did with ZKWatchedEphemeral.
> In addition to fixing such races with the controller, it would improve 
> performance significantly if we used the async API (see KAFKA-3038). The 
> async API is more efficient because it pipelines the requests to ZooKeeper, 
> and the number of requests upon controller recovery can be large.
> This jira proposes to make these two changes to the calls in ZkUtils and to 
> do it, one path is to first replace the calls in ZkUtils with raw async ZK 
> calls and block so that we don't have to change the controller code in this 
> phase. Once this step is accomplished and it is stable, we make changes to 
> the controller to handle the asynchronous calls to ZooKeeper.
> Note that in the first step, we will need to introduce some logic for session 
> management, which is currently handled entirely by ZkClient.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-01-29 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3173:
---

 Summary: Error while moving some partitions to OnlinePartition 
state 
 Key: KAFKA-3173
 URL: https://issues.apache.org/jira/browse/KAFKA-3173
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.9.0.0
Reporter: Flavio Junqueira
Priority: Critical
 Fix For: 0.9.0.1


We observed another instance of the problem reported in KAFKA-2300, but this 
time the error appeared in the partition state machine. In KAFKA-2300, we 
haven't cleaned up the state in {{PartitionStateMachine}} and 
{{ReplicaStateMachine}} as we do in {{KafkaController}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-01-29 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3173:

Description: 
We observed another instance of the problem reported in KAFKA-2300, but this 
time the error appeared in the partition state machine. In KAFKA-2300, we 
haven't cleaned up the state in {{PartitionStateMachine}} and 
{{ReplicaStateMachine}} as we do in {{KafkaController}}.

Here is the stack trace:

{noformat}
2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: Error 
while moving some partitions to OnlinePartition state 
(kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
Controller to broker state change requests batch is not empty while creating a 
new one. Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> 
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
 might be lostat 
kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
at 
kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
at 
kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)  
  at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{noformat}

  was:We observed another instance of the problem reported in KAFKA-2300, but 
this time the error appeared in the partition state machine. In KAFKA-2300, we 
haven't cleaned up the state in {{PartitionStateMachine}} and 
{{ReplicaStateMachine}} as we do in {{KafkaController}}.


> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Priority: Critical
> Fix For: 0.9.0.1
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
> Controller to broker state change requests batch is not empty while creating 
> a new one. Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> 
> (LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
>  might be lostat 
> kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
> at 
> kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
> at 
> kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
> at 
> kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at 
> kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
> at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
> kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
> at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
> org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3173) Error while moving some partitions to OnlinePartition state

2016-01-29 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3173:

Description: 
We observed another instance of the problem reported in KAFKA-2300, but this 
time the error appeared in the partition state machine. In KAFKA-2300, we 
haven't cleaned up the state in {{PartitionStateMachine}} and 
{{ReplicaStateMachine}} as we do in {{KafkaController}}.

Here is the stack trace:

{noformat}
2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: Error 
while moving some partitions to OnlinePartition state 
(kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
Controller to broker state change requests batch is not empty while creating a 
new one. 
Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo:
(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
 might be lostat 
kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
at 
kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
at 
kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)  
  at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
at 
kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
at 
org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{noformat}

  was:
We observed another instance of the problem reported in KAFKA-2300, but this 
time the error appeared in the partition state machine. In KAFKA-2300, we 
haven't cleaned up the state in {{PartitionStateMachine}} and 
{{ReplicaStateMachine}} as we do in {{KafkaController}}.

Here is the stack trace:

{noformat}
2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: Error 
while moving some partitions to OnlinePartition state 
(kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: 
Controller to broker state change requests batch is not empty while creating a 
new one. Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> 
(LeaderAndIsrInfo:(Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0)))
 might be lostat 
kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254)
at 
kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144)
at 
kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517)
at 
kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504)  
  at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
at 
kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419)
at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)at 
kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418)
at org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842)at 
org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{noformat}


> Error while moving some partitions to OnlinePartition state 
> 
>
> Key: KAFKA-3173
> URL: https://issues.apache.org/jira/browse/KAFKA-3173
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Priority: Critical
> Fix For: 0.9.0.1
>
>
> We observed another instance of the problem reported in KAFKA-2300, but this 
> time the error appeared in the partition state machine. In KAFKA-2300, we 
> haven't cleaned up the state in {{PartitionStateMachine}} and 
> {{ReplicaStateMachine}} as we do in {{KafkaController}}.
> Here is the stack trace:
> {noformat}
> 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: 
> Error while moving some partitions to OnlinePartition state 
> 

[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-21 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110680#comment-15110680
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

bq. 1) We need to use a multi-op that combines the update to the ISR and a 
znode check. The znode check verifies that the version of the controller 
leadership znode is still the same and if it passes, then the ISR data is 
updated.

I was really just thinking out loud, the multiop is just a hack to get around 
the fact that controller broker doesn't know if the underlying session has been 
recreated or not. The comment about using multiop was simply pointing that you 
can check and update atomically with this multiop recipe. If we do this the 
right way, then we don't need to use a multiop call.

bq. 2) The race condition that Jun Rao mentioned still exist above in 1).

It still exists but the multiop would fail to perform the update on ZK if 
you're checking a version.

bq. 4) To do step 3), as Jun Rao suggested we have to detect the connection 
loss event.

There are two parts. Detecting connection loss is one of them. If the 
controller isn't sure about its session when it receives connection loss, then 
it should stop. The second part is not to create a new session if the previous 
one expired. If the session of A has expired, which must happen by step 2) 
otherwise B can't be elected, then A isn't able to get requests completed on 
the expired session. Once B is elected, the session of A must have expired and 
no update coming from A will be executed. Of course, we want to bring broker A 
back up and to do it, we need to start a new session. However, before starting 
a new session, we need to make sure to stop any controller work in A.

bq. i) Broker A has connection loss and connects immediately in which case it 
gets a SyncConnected event. Now the session MIGHT NOT have expired since the 
connection happened immediately. Broker A is expected to continue since it is 
still the controller and the session has not expired. ii) Broker A has 
connection loss and connects back in which case it gets a SyncConnected event. 
Now the session MIGHT have expired. Broker A is expected to stop all the zk 
operations.

The broker will only get SyncConnected if it connects and it is able to 
validate the session. If the session is invalid, then it gets an Expired 
notification. Note that if we are using SASL to authenticate, then we could be 
also getting an authenticated event.

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-21 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15110748#comment-15110748
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

Sure, we need to transform all operations to look like what we currently have 
in ZKCheckedEphemeral. That particular class is a bit special because it 
performs checks and such, but essentially we need to change the current calls 
in ZkUtils to use asynchronous calls using the ZK handle directly and have a 
callback class that pairs up with the call.

Related to this present issue, we will also need to implement session 
management, but this time it can't try to be transparent like ZkClient does. It 
is good to have a central point to get the current zk handle from, but we need 
to give the broker the ability to signal when to create a new session. As part 
of this signaling, we will need to implement some kind of listener to propagate 
events. Another option is to let the broker implement directly a Watcher to 
process event notifications.

One simple way to start is to replace gradually the calls in ZkUtils with 
asynchronous calls, still using the handle ZkUtils provide. The calls would 
block to maintain the current behavior outside ZkUtils. Once that's done, we 
can make the calls non-blocking and do the necessary changes across 
broker/controller. Finally, we can replace the session management with our own 
last.

If you guys want to do this, then we should probably create an umbrella jira.   

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3128) Add metrics for ZooKeeper events

2016-01-21 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3128:

Summary: Add metrics for ZooKeeper events  (was: Add metric for ZooKeeper 
events)

> Add metrics for ZooKeeper events
> 
>
> Key: KAFKA-3128
> URL: https://issues.apache.org/jira/browse/KAFKA-3128
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, zkclient
>Reporter: Flavio Junqueira
> Fix For: 0.9.1.0
>
>
> It would be useful to report via Kafka metrics the number of ZK event 
> notifications, such as connection loss events, session expiration events, 
> etc., as a way of spotting potential issues with the communication with the 
> ZK ensemble.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3128) Add metric for ZooKeeper events

2016-01-21 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3128:
---

 Summary: Add metric for ZooKeeper events
 Key: KAFKA-3128
 URL: https://issues.apache.org/jira/browse/KAFKA-3128
 Project: Kafka
  Issue Type: Improvement
  Components: core, zkclient
Reporter: Flavio Junqueira
 Fix For: 0.9.1.0


It would be useful to report via Kafka metrics the number of ZK event 
notifications, such as connection loss events, session expiration events, etc., 
as a way of spotting potential issues with the communication with the ZK 
ensemble.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-18 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-3083:

Summary: a soft failure in controller may leave a topic partition in an 
inconsistent state  (was: a soft failure in controller may leader a topic 
partition in an inconsistent state)

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-18 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105587#comment-15105587
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

[~junrao]
bq. does controller A just resume from where it's left off? Or does it ignore 
all outstanding events and re-read all subscribed ZK paths (since there could 
be missing events between the connection loss event and the SyncConnected 
event)?

I don't see a reason for ignoring outstanding events and re-reading zk state. 
If the session hasn't expired, then the broker is still the controller and I'd 
say it is safe to assume the no other controller work happened in parallel.

bq. ZkClient actually hides the ZK ConnectionLoss event and only informs the 
application when the ZK session expires. To pursue this, we will have to access 
ZK directly.

I think further down you noted that ZkClient actually exposes the connection 
loss event, but does put a thread in the middle.

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-18 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105597#comment-15105597
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

[~junrao]

bq. Even if it exists, it has the same problem--the session can expire after 
the check. So, you pretty much have to rely on the raw ZK client api that 
handles ConnectionLossException while reading/writing to ZK.

Perhaps you meant to say this, but you can also learn about a connection loss 
via the watcher you pass when creating a zookeeper object.



> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-18 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105612#comment-15105612
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

[~mgharat]

bq. I was just thinking if we can modify the controller code to always check if 
it is the controller before it makes such changes to zookeeper.

In principle, there is the race that [~junrao] mentioned, but I was thinking 
that one possibility would be use a multi-op that combines the update to the 
ISR and a znode check. The znode check verifies that the version of the 
controller leadership znode is still the same and if it passes, then the ISR 
data is updated. Using the scenario in the description to illustrate, when 
broker A tries to update the ISR state in ZK in step 3, the operation fails 
because the version of the controller leadership znode has changed.

The solution of handling the connection loss event is typical, but we could 
consider adding a multi-op to be extra safe against these spurious writes. 

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leave a topic partition in an inconsistent state

2016-01-18 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15105762#comment-15105762
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

Another thing to mention is that if ZkClient didn't create a new session 
transparently, then the update of broker A in step 3 would fail because the 
session has expired and the ZK ensemble wouldn't take a request from an expired 
session.

> a soft failure in controller may leave a topic partition in an inconsistent 
> state
> -
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-15 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15101993#comment-15101993
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

Hey [~mgharat], Best practice with ZK is to put the master (controller in this 
case) on hold upon a connection loss event and wait until the next event, which 
can be flagging a reconnection or that the session has expired. It should call 
{{controllerResignation}} upon a session expiration, and resume if it 
reconnects.

But, we have to be careful because we can't really control the speed of 
messages, and even if A stops before B takes over in your example, we can't 
guarantee that some message from A will hit some broker somewhere late. The 
description of this jira says that C correctly discards an old message, and it 
should be like that, so this part looks fine this far. It is about the change 
in ZK happening at the wrong time.

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3107) Error when trying to shut down auto balancing scheduler of controller

2016-01-14 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3107:
---

 Summary: Error when trying to shut down auto balancing scheduler 
of controller
 Key: KAFKA-3107
 URL: https://issues.apache.org/jira/browse/KAFKA-3107
 Project: Kafka
  Issue Type: Bug
Affects Versions: 0.8.2.1
Reporter: Flavio Junqueira


We observed the following exception when a controller was shutting down:

{noformat}
[run] Error handling event ZkEvent[New session event sent to 
kafka.controller.KafkaController$SessionExpirationListener@3278c211]
java.lang.IllegalStateException: Kafka scheduler has not been started
at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
at 
kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
at 
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108)
at 
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
at 
kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
at kafka.utils.Utils$.inLock(Utils.scala:535)
at 
kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107)
at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
{noformat}

The scheduler should have been started.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-13 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095920#comment-15095920
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

@mayuresh the fact that A kept going with a session expired makes me think that 
A ignored the connection loss event and kept doing controller work. What we 
recommend for mastership with ZooKeeper is that the master stops doing master 
work upon receiving a connection loss event, and either resumes if it 
reconnects or drops mastership altogether if the session expires. Talking to 
[~junrao] about this, it sounds like the controller isn't processing the event 
that ZkClient is passing up.

Let me give you some more context on session semantics. At 2/3 of the session 
expiration, if the client hasn't heard from the current server it is connected 
to, then it will start looking for another server and will notify the 
application via connection loss events. At that point, the recommendation is 
that the client (broker in this case) stops doing any master work until it 
learns more about the session.

I also need to add that I haven't verified this in the code, so it is possible 
that it is something else causing the problem, but it sounds wrong that a 
controller with a session expired keeps going.

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Comment Edited] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-13 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15095920#comment-15095920
 ] 

Flavio Junqueira edited comment on KAFKA-3083 at 1/13/16 10:04 AM:
---

[~mgharat] the fact that A kept going with a session expired makes me think 
that A ignored the connection loss event and kept doing controller work. What 
we recommend for mastership with ZooKeeper is that the master stops doing 
master work upon receiving a connection loss event, and either resumes if it 
reconnects or drops mastership altogether if the session expires. Talking to 
[~junrao] about this, it sounds like the controller isn't processing the event 
that ZkClient is passing up.

Let me give you some more context on session semantics. At 2/3 of the session 
expiration, if the client hasn't heard from the current server it is connected 
to, then it will start looking for another server and will notify the 
application via connection loss events. At that point, the recommendation is 
that the client (broker in this case) stops doing any master work until it 
learns more about the session.

I also need to add that I haven't verified this in the code, so it is possible 
that it is something else causing the problem, but it sounds wrong that a 
controller with a session expired keeps going.


was (Author: fpj):
@mayuresh the fact that A kept going with a session expired makes me think that 
A ignored the connection loss event and kept doing controller work. What we 
recommend for mastership with ZooKeeper is that the master stops doing master 
work upon receiving a connection loss event, and either resumes if it 
reconnects or drops mastership altogether if the session expires. Talking to 
[~junrao] about this, it sounds like the controller isn't processing the event 
that ZkClient is passing up.

Let me give you some more context on session semantics. At 2/3 of the session 
expiration, if the client hasn't heard from the current server it is connected 
to, then it will start looking for another server and will notify the 
application via connection loss events. At that point, the recommendation is 
that the client (broker in this case) stops doing any master work until it 
learns more about the session.

I also need to add that I haven't verified this in the code, so it is possible 
that it is something else causing the problem, but it sounds wrong that a 
controller with a session expired keeps going.

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-13 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15097302#comment-15097302
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

It sounds like this comment from [~junrao] is extending the description of the 
jira. It assumes that the replica that was removed from the ISR in step 1 
eventually came back, but it coming back isn't reflected in the state of ZK. 
However, the replica would be in the cache of the controller B, so would it be 
elected in this case? Would it be an actual problem if the B is demoted and 
another controller comes up? 

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3083) a soft failure in controller may leader a topic partition in an inconsistent state

2016-01-13 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15097330#comment-15097330
 ] 

Flavio Junqueira commented on KAFKA-3083:
-

One clarification, if broker A shrinks the partition ISR in ZK before step 2 
but notifies C after step 2 as in the description, does C eventually learn the 
ISR stored in ZK? If it doesn't, then the observation about connection loss 
might not be sufficient to fix it or even matter at all in this case.   

> a soft failure in controller may leader a topic partition in an inconsistent 
> state
> --
>
> Key: KAFKA-3083
> URL: https://issues.apache.org/jira/browse/KAFKA-3083
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Jun Rao
>Assignee: Mayuresh Gharat
>
> The following sequence can happen.
> 1. Broker A is the controller and is in the middle of processing a broker 
> change event. As part of this process, let's say it's about to shrink the isr 
> of a partition.
> 2. Then broker A's session expires and broker B takes over as the new 
> controller. Broker B sends the initial leaderAndIsr request to all brokers.
> 3. Broker A continues by shrinking the isr of the partition in ZK and sends 
> the new leaderAndIsr request to the broker (say C) that leads the partition. 
> Broker C will reject this leaderAndIsr since the request comes from a 
> controller with an older epoch. Now we could be in a situation that Broker C 
> thinks the isr has all replicas, but the isr stored in ZK is different.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2260) Allow specifying expected offset on produce

2016-01-11 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-2260:


Patch is stale, I'm uploading an updated version. 

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Kirwin
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2260) Allow specifying expected offset on produce

2016-01-11 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2260?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-2260:

Attachment: KAFKA-2260.patch

I understand we are doing patchers through github push requests, but here I'm 
just uploading an updated version of what was already here.

> Allow specifying expected offset on produce
> ---
>
> Key: KAFKA-2260
> URL: https://issues.apache.org/jira/browse/KAFKA-2260
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ben Kirwin
>Assignee: Ewen Cheslack-Postava
>Priority: Minor
> Attachments: KAFKA-2260.patch, expected-offsets.patch
>
>
> I'd like to propose a change that adds a simple CAS-like mechanism to the 
> Kafka producer. This update has a small footprint, but enables a bunch of 
> interesting uses in stream processing or as a commit log for process state.
> h4. Proposed Change
> In short:
> - Allow the user to attach a specific offset to each message produced.
> - The server assigns offsets to messages in the usual way. However, if the 
> expected offset doesn't match the actual offset, the server should fail the 
> produce request instead of completing the write.
> This is a form of optimistic concurrency control, like the ubiquitous 
> check-and-set -- but instead of checking the current value of some state, it 
> checks the current offset of the log.
> h4. Motivation
> Much like check-and-set, this feature is only useful when there's very low 
> contention. Happily, when Kafka is used as a commit log or as a 
> stream-processing transport, it's common to have just one producer (or a 
> small number) for a given partition -- and in many of these cases, predicting 
> offsets turns out to be quite useful.
> - We get the same benefits as the 'idempotent producer' proposal: a producer 
> can retry a write indefinitely and be sure that at most one of those attempts 
> will succeed; and if two producers accidentally write to the end of the 
> partition at once, we can be certain that at least one of them will fail.
> - It's possible to 'bulk load' Kafka this way -- you can write a list of n 
> messages consecutively to a partition, even if the list is much larger than 
> the buffer size or the producer has to be restarted.
> - If a process is using Kafka as a commit log -- reading from a partition to 
> bootstrap, then writing any updates to that same partition -- it can be sure 
> that it's seen all of the messages in that partition at the moment it does 
> its first (successful) write.
> There's a bunch of other similar use-cases here, but they all have roughly 
> the same flavour.
> h4. Implementation
> The major advantage of this proposal over other suggested transaction / 
> idempotency mechanisms is its minimality: it gives the 'obvious' meaning to a 
> currently-unused field, adds no new APIs, and requires very little new code 
> or additional work from the server.
> - Produced messages already carry an offset field, which is currently ignored 
> by the server. This field could be used for the 'expected offset', with a 
> sigil value for the current behaviour. (-1 is a natural choice, since it's 
> already used to mean 'next available offset'.)
> - We'd need a new error and error code for a 'CAS failure'.
> - The server assigns offsets to produced messages in 
> {{ByteBufferMessageSet.validateMessagesAndAssignOffsets}}. After this 
> changed, this method would assign offsets in the same way -- but if they 
> don't match the offset in the message, we'd return an error instead of 
> completing the write.
> - To avoid breaking existing clients, this behaviour would need to live 
> behind some config flag. (Possibly global, but probably more useful 
> per-topic?)
> I understand all this is unsolicited and possibly strange: happy to answer 
> questions, and if this seems interesting, I'd be glad to flesh this out into 
> a full KIP or patch. (And apologies if this is the wrong venue for this sort 
> of thing!)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-3038) Speeding up partition reassignment after broker failure

2016-01-06 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-3038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15085300#comment-15085300
 ] 

Flavio Junqueira commented on KAFKA-3038:
-

You don't really need to batch with multi, you just need to make the calls 
asynchronous. In fact, unless you really need to make multiple updates 
transactional, the preferred way is to push updates asynchronously to keep the 
pipeline full.

> Speeding up partition reassignment after broker failure
> ---
>
> Key: KAFKA-3038
> URL: https://issues.apache.org/jira/browse/KAFKA-3038
> Project: Kafka
>  Issue Type: Improvement
>  Components: controller, core
>Affects Versions: 0.9.0.0
>Reporter: Eno Thereska
>Assignee: Eno Thereska
> Fix For: 0.9.0.0
>
>
> After a broker failure the controller does several writes to Zookeeper for 
> each partition on the failed broker. Writes are done one at a time, in closed 
> loop, which is slow especially under high latency networks. Zookeeper has 
> support for batching operations (the "multi" API). It is expected that 
> substituting serial writes with batched ones should reduce failure handling 
> time by an order of magnitude.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-3069) Fix recursion in ZkSecurityMigrator

2016-01-06 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-3069:
---

 Summary: Fix recursion in ZkSecurityMigrator
 Key: KAFKA-3069
 URL: https://issues.apache.org/jira/browse/KAFKA-3069
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 0.9.0.0
Reporter: Flavio Junqueira
 Fix For: 0.9.0.1


The zk migrator tool recursively sets ACLs starting with the root, which we 
initially assumed was either the root of a dedicated ensemble or a chroot. 
However, there are at least two reasons for not doing it this way. First, 
shared ensembles might not really follow the practice of separating 
applications into branches, essentially creating a chroot for each. Second, 
there are paths we don't want to secure, like the ConsumersPath.

To fix this, we simply need to set the root ACL separately and start the 
recursion on each of the persistent paths to secure.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-3069) Fix recursion in ZkSecurityMigrator

2016-01-06 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-3069?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira reassigned KAFKA-3069:
---

Assignee: Flavio Junqueira

> Fix recursion in ZkSecurityMigrator
> ---
>
> Key: KAFKA-3069
> URL: https://issues.apache.org/jira/browse/KAFKA-3069
> Project: Kafka
>  Issue Type: Bug
>  Components: security
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
> Fix For: 0.9.0.1
>
>
> The zk migrator tool recursively sets ACLs starting with the root, which we 
> initially assumed was either the root of a dedicated ensemble or a chroot. 
> However, there are at least two reasons for not doing it this way. First, 
> shared ensembles might not really follow the practice of separating 
> applications into branches, essentially creating a chroot for each. Second, 
> there are paths we don't want to secure, like the ConsumersPath.
> To fix this, we simply need to set the root ACL separately and start the 
> recursion on each of the persistent paths to secure.  



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2997) Synchronous write to disk

2015-12-16 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2997?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15060260#comment-15060260
 ] 

Flavio Junqueira commented on KAFKA-2997:
-

[~afirus] I'm not very sure of what you're trying to achieve here. It sounds 
like you don't want to count unflushed messages and instead you want to have a 
timer thread to trigger the flush. I'm wondering if you really need to have 
that thread to achieve your goal or if you can simply run a thread in a tight 
loop that will flush, accumulate while flushing, flush, accumulate while 
flushing, and so on. What I'm proposing is essentially what we do in the 
SyncRequestProcessor of ZooKeeper:

https://github.com/apache/zookeeper/blob/trunk/src/java/main/org/apache/zookeeper/server/SyncRequestProcessor.java
  

> Synchronous write to disk
> -
>
> Key: KAFKA-2997
> URL: https://issues.apache.org/jira/browse/KAFKA-2997
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 0.9.0.0
>Reporter: Arkadiusz Firus
>Priority: Minor
>  Labels: features, patch
>
> Hi All,
> I am currently work on a mechanism which allows to do an efficient 
> synchronous writing to the file system. My idea is to gather few write 
> requests for one partition and after that call the fsync.
> As I read the code I find out that the best place to do it is to modify:
> kafka.log.Log.append
> method. Currently at the end of the method (line 368) there is a verification 
> if the number of unflushed messages is greater than the flush interval 
> (configuration parameter).
> I am thinking of extending this condition. I want to add additional boolean 
> configuration parameter (sync write or something like this). If this 
> parameter is set to true at the end of this method the thread should hang on 
> a lock. On the other hand there will be another timer thread (for every 
> partition) which will be invoked every 10ms (configuration parameter). During 
> invocation the thread will call flush method and after that will be releasing 
> all hanged threads.
> I am writing here because I would like to know your opinion about such 
> approach. Do you think this one is good or maybe someone have a better (more 
> permanent) one. I would also like to know if such approach is according to 
> general Kafka architecture.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2979) Enable authorizer and ACLs in ducktape tests

2015-12-10 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-2979:
---

 Summary: Enable authorizer and ACLs in ducktape tests
 Key: KAFKA-2979
 URL: https://issues.apache.org/jira/browse/KAFKA-2979
 Project: Kafka
  Issue Type: Test
  Components: system tests
Affects Versions: 0.9.0.0
Reporter: Flavio Junqueira
Assignee: Flavio Junqueira
 Fix For: 0.9.1.0


Add some support to test ACLs with ducktape tests and enable some test cases to 
use it. 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2951) Add test cases to EndToEndAuthorizationTest

2015-12-07 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-2951:
---

 Summary: Add test cases to EndToEndAuthorizationTest
 Key: KAFKA-2951
 URL: https://issues.apache.org/jira/browse/KAFKA-2951
 Project: Kafka
  Issue Type: Test
Affects Versions: 0.9.0.0
Reporter: Flavio Junqueira
Assignee: Flavio Junqueira
 Fix For: 0.9.1.0


There are a few test cases that are worth adding. I've run them manually, but 
it sounds like a good idea to have them in:

# Test incorrect topic name (authorization failure)
# Test topic wildcard



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration

2015-12-07 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-2952:

Affects Version/s: 0.9.0.0

> Add ducktape test for secure->unsecure ZK migration 
> 
>
> Key: KAFKA-2952
> URL: https://issues.apache.org/jira/browse/KAFKA-2952
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
> Fix For: 0.9.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration

2015-12-07 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-2952:

Fix Version/s: 0.9.1.0

> Add ducktape test for secure->unsecure ZK migration 
> 
>
> Key: KAFKA-2952
> URL: https://issues.apache.org/jira/browse/KAFKA-2952
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
> Fix For: 0.9.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Created] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration

2015-12-07 Thread Flavio Junqueira (JIRA)
Flavio Junqueira created KAFKA-2952:
---

 Summary: Add ducktape test for secure->unsecure ZK migration 
 Key: KAFKA-2952
 URL: https://issues.apache.org/jira/browse/KAFKA-2952
 Project: Kafka
  Issue Type: Test
Reporter: Flavio Junqueira






--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Assigned] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration

2015-12-07 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira reassigned KAFKA-2952:
---

Assignee: Flavio Junqueira

> Add ducktape test for secure->unsecure ZK migration 
> 
>
> Key: KAFKA-2952
> URL: https://issues.apache.org/jira/browse/KAFKA-2952
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
> Fix For: 0.9.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Updated] (KAFKA-2952) Add ducktape test for secure->unsecure ZK migration

2015-12-07 Thread Flavio Junqueira (JIRA)

 [ 
https://issues.apache.org/jira/browse/KAFKA-2952?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flavio Junqueira updated KAFKA-2952:

Description: We have test cases for the unsecure -> secure path, but not 
the other way around, We should add it.

> Add ducktape test for secure->unsecure ZK migration 
> 
>
> Key: KAFKA-2952
> URL: https://issues.apache.org/jira/browse/KAFKA-2952
> Project: Kafka
>  Issue Type: Test
>Affects Versions: 0.9.0.0
>Reporter: Flavio Junqueira
>Assignee: Flavio Junqueira
> Fix For: 0.9.1.0
>
>
> We have test cases for the unsecure -> secure path, but not the other way 
> around, We should add it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)


[jira] [Commented] (KAFKA-2910) Failure in kafka.api.SslEndToEndAuthorizationTest.testNoGroupAcl

2015-12-07 Thread Flavio Junqueira (JIRA)

[ 
https://issues.apache.org/jira/browse/KAFKA-2910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15044814#comment-15044814
 ] 

Flavio Junqueira commented on KAFKA-2910:
-

Based on this WARN:

{noformat}
[2015-11-30 01:35:03,481] WARN SASL configuration failed: 
javax.security.auth.login.LoginException: No JAAS configuration
section named 'Client' was found in specified JAAS configuration file: 
'/tmp/jaas6536686531650477656.conf'. Will continue 
connection to Zookeeper server without SASL authentication, if Zookeeper server 
allows it. 
(org.apache.zookeeper.ClientCnxn:957)
{noformat}

It looks like another instance of the the configuration reset problem. 

> Failure in kafka.api.SslEndToEndAuthorizationTest.testNoGroupAcl
> 
>
> Key: KAFKA-2910
> URL: https://issues.apache.org/jira/browse/KAFKA-2910
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
> Fix For: 0.9.1.0
>
>
> {code}
> java.lang.SecurityException: zkEnableSecureAcls is true, but the verification 
> of the JAAS login file failed.
>   at kafka.server.KafkaServer.initZk(KafkaServer.scala:265)
>   at kafka.server.KafkaServer.startup(KafkaServer.scala:168)
>   at kafka.utils.TestUtils$.createServer(TestUtils.scala:143)
>   at 
> kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:66)
>   at 
> kafka.integration.KafkaServerTestHarness$$anonfun$setUp$1.apply(KafkaServerTestHarness.scala:66)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:742)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> kafka.integration.KafkaServerTestHarness$class.setUp(KafkaServerTestHarness.scala:66)
>   at 
> kafka.api.SslEndToEndAuthorizationTest.kafka$api$IntegrationTestHarness$$super$setUp(SslEndToEndAuthorizationTest.scala:24)
>   at 
> kafka.api.IntegrationTestHarness$class.setUp(IntegrationTestHarness.scala:58)
>   at 
> kafka.api.SslEndToEndAuthorizationTest.kafka$api$EndToEndAuthorizationTest$$super$setUp(SslEndToEndAuthorizationTest.scala:24)
>   at 
> kafka.api.EndToEndAuthorizationTest$class.setUp(EndToEndAuthorizationTest.scala:141)
>   at 
> kafka.api.SslEndToEndAuthorizationTest.setUp(SslEndToEndAuthorizationTest.scala:24)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
>   at 

  1   2   >