[jira] [Updated] (KAFKA-2207) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception:
[ https://issues.apache.org/jira/browse/KAFKA-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepthi updated KAFKA-2207: --- Attachment: KAFKA-2207.patch The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception: - Key: KAFKA-2207 URL: https://issues.apache.org/jira/browse/KAFKA-2207 Project: Kafka Issue Type: Bug Reporter: Deepthi Fix For: 0.8.2.1 Attachments: KAFKA-2207.patch kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2207) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception:
[ https://issues.apache.org/jira/browse/KAFKA-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepthi updated KAFKA-2207: --- Status: Patch Available (was: Open) KAFKA-2207 Patch has fixed the issue The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception: - Key: KAFKA-2207 URL: https://issues.apache.org/jira/browse/KAFKA-2207 Project: Kafka Issue Type: Bug Reporter: Deepthi Fix For: 0.8.2.1 Attachments: KAFKA-2207.patch kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. The following attached patch has resolved the issue -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 20, 2015, 10:42 a.m.) Review request for kafka. Bugs: 1928 and KAFKA-1928 https://issues.apache.org/jira/browse/1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description (updated) --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Following Jun's comments - moved MultiSend to client. Cleaned up destinations as well removed reify and remaining from send/recieve API, per Jun. moved maybeCloseOldest() to Selector per Jay added idString to node API, changed written to int in Send API cleaning up MultiSend, added size() to Send interface fixed some issues with multisend Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 fixed metric thingies fixed response order bug error handling for illegal selector state and fix metrics bug optimized selection key lookup with identity hash fix accidental change Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 addressing Jun's comments removed connection-aging for clients fix issues with exception handling and other cleanup Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Revert removed connection-aging for clients This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1. improving exception handling and other minor fixes Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 8e336a3aa96c73f52beaeb56b931baf4b026cf21 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 187d0004c8c46b6664ddaffecc6166d4b47351e5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 1e943d621732889a1c005b243920dc32cea7af66 clients/src/main/java/org/apache/kafka/common/Node.java f4e4186c7602787e58e304a2f1c293a633114656 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 129ae827bccbd982ad93d56e46c6f5c46f147fe0 clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/MultiSend.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/main/java/org/apache/kafka/common/network/Send.java 5d321a09e470166a1c33639cf0cab26a3bce98ec clients/src/main/java/org/apache/kafka/common/requests/RequestSend.java 27cbf390c7f148ffa8c5abc154c72cbf0829715c
[jira] [Commented] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network
[ https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552115#comment-14552115 ] Gwen Shapira commented on KAFKA-1928: - Updated reviewboard https://reviews.apache.org/r/33065/diff/ against branch trunk Move kafka.network over to using the network classes in org.apache.kafka.common.network --- Key: KAFKA-1928 URL: https://issues.apache.org/jira/browse/KAFKA-1928 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jun Rao Assignee: Gwen Shapira Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch, KAFKA-1928_2015-04-30_17:48:33.patch, KAFKA-1928_2015-05-01_15:45:24.patch, KAFKA-1928_2015-05-12_12:00:37.patch, KAFKA-1928_2015-05-12_12:57:57.patch, KAFKA-1928_2015-05-15.patch, KAFKA-1928_2015-05-15_03.patch, KAFKA-1928_2015-05-15_10:30:31.patch, KAFKA-1928_2015-05-18_17:57:39.patch, KAFKA-1928_2015-05-18_18:55:38.patch, KAFKA-1928_2015-05-19_11:26:18.patch, KAFKA-1928_2015-05-20_13:41:37.patch As part of the common package we introduced a bunch of network related code and abstractions. We should look into replacing a lot of what is in kafka.network with this code. Duplicate classes include things like Receive, Send, etc. It is likely possible to also refactor the SocketServer to make use of Selector which should significantly simplify it's code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1928) Move kafka.network over to using the network classes in org.apache.kafka.common.network
[ https://issues.apache.org/jira/browse/KAFKA-1928?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-1928: Attachment: KAFKA-1928_2015-05-20_13:41:37.patch Move kafka.network over to using the network classes in org.apache.kafka.common.network --- Key: KAFKA-1928 URL: https://issues.apache.org/jira/browse/KAFKA-1928 Project: Kafka Issue Type: Sub-task Components: security Reporter: Jun Rao Assignee: Gwen Shapira Attachments: KAFKA-1928.patch, KAFKA-1928_2015-04-28_00:09:40.patch, KAFKA-1928_2015-04-30_17:48:33.patch, KAFKA-1928_2015-05-01_15:45:24.patch, KAFKA-1928_2015-05-12_12:00:37.patch, KAFKA-1928_2015-05-12_12:57:57.patch, KAFKA-1928_2015-05-15.patch, KAFKA-1928_2015-05-15_03.patch, KAFKA-1928_2015-05-15_10:30:31.patch, KAFKA-1928_2015-05-18_17:57:39.patch, KAFKA-1928_2015-05-18_18:55:38.patch, KAFKA-1928_2015-05-19_11:26:18.patch, KAFKA-1928_2015-05-20_13:41:37.patch As part of the common package we introduced a bunch of network related code and abstractions. We should look into replacing a lot of what is in kafka.network with this code. Duplicate classes include things like Receive, Send, etc. It is likely possible to also refactor the SocketServer to make use of Selector which should significantly simplify it's code. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33065: Patch for KAFKA-1928
On May 19, 2015, 11:34 p.m., Jun Rao wrote: core/src/main/scala/kafka/network/SocketServer.scala, line 431 https://reviews.apache.org/r/33065/diff/14/?file=963545#file963545line431 We probably just want to catch Exception. For unexpected error, we will just let it propagate and kill the processor thread. I hope you meant that catching Throwable is too much and that we need to handle more specific exceptions :) - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review84398 --- On May 19, 2015, 8:26 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 19, 2015, 8:26 a.m.) Review request for kafka. Bugs: 1928 and KAFKA-1928 https://issues.apache.org/jira/browse/1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Following Jun's comments - moved MultiSend to client. Cleaned up destinations as well removed reify and remaining from send/recieve API, per Jun. moved maybeCloseOldest() to Selector per Jay added idString to node API, changed written to int in Send API cleaning up MultiSend, added size() to Send interface fixed some issues with multisend Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 fixed metric thingies fixed response order bug error handling for illegal selector state and fix metrics bug optimized selection key lookup with identity hash fix accidental change Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 addressing Jun's comments removed connection-aging for clients fix issues with exception handling and other cleanup Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 8e336a3aa96c73f52beaeb56b931baf4b026cf21 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 1e943d621732889a1c005b243920dc32cea7af66 clients/src/main/java/org/apache/kafka/common/Node.java f4e4186c7602787e58e304a2f1c293a633114656 clients/src/main/java/org/apache/kafka/common/network/ByteBufferReceive.java 129ae827bccbd982ad93d56e46c6f5c46f147fe0 clients/src/main/java/org/apache/kafka/common/network/ByteBufferSend.java c8213e156ec9c9af49ee09f5238492318516aaa3 clients/src/main/java/org/apache/kafka/common/network/MultiSend.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java fc0d168324aaebb97065b0aafbd547a1994d76a7 clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java 68327cd3a734fd429966d3e2016a2488dbbb19e5 clients/src/main/java/org/apache/kafka/common/network/Receive.java 4e33078c1eec834bd74aabcb5fc69f18c9d6d52a clients/src/main/java/org/apache/kafka/common/network/Selectable.java
[jira] [Issue Comment Deleted] (KAFKA-2207) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception:
[ https://issues.apache.org/jira/browse/KAFKA-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepthi updated KAFKA-2207: --- Comment: was deleted (was: KAFKA-2207 Patch has fixed the issue) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception: - Key: KAFKA-2207 URL: https://issues.apache.org/jira/browse/KAFKA-2207 Project: Kafka Issue Type: Bug Reporter: Deepthi Fix For: 0.8.2.1 Attachments: KAFKA-2207.patch kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. The following attached patch has resolved the issue -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2207) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception:
Deepthi created KAFKA-2207: -- Summary: The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception: Key: KAFKA-2207 URL: https://issues.apache.org/jira/browse/KAFKA-2207 Project: Kafka Issue Type: Bug Reporter: Deepthi Fix For: 0.8.2.1 kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2207) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception:
[ https://issues.apache.org/jira/browse/KAFKA-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepthi updated KAFKA-2207: --- Status: Patch Available (was: Open) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception: - Key: KAFKA-2207 URL: https://issues.apache.org/jira/browse/KAFKA-2207 Project: Kafka Issue Type: Bug Reporter: Deepthi Fix For: 0.8.2.1 kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2207) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception:
[ https://issues.apache.org/jira/browse/KAFKA-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepthi updated KAFKA-2207: --- Description: kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. The following attached patch has resolved the issue was: kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception: - Key: KAFKA-2207 URL: https://issues.apache.org/jira/browse/KAFKA-2207 Project: Kafka Issue Type: Bug Reporter: Deepthi Fix For: 0.8.2.1 Attachments: KAFKA-2207.patch kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. The following attached patch has resolved the issue -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2207) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception:
[ https://issues.apache.org/jira/browse/KAFKA-2207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Deepthi updated KAFKA-2207: --- Status: Open (was: Patch Available) The testCannotSendToInternalTopic test method in ProducerFailureHandlingTest fails consistently with the following exception: - Key: KAFKA-2207 URL: https://issues.apache.org/jira/browse/KAFKA-2207 Project: Kafka Issue Type: Bug Reporter: Deepthi Fix For: 0.8.2.1 kafka.api.ProducerFailureHandlingTest testCannotSendToInternalTopic FAILED java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.init(KafkaProducer.java:437) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:352) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:248) at kafka.api.ProducerFailureHandlingTest.testCannotSendToInternalTopic(ProducerFailureHandlingTest.scala:309) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 3000 ms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1334) Coordinator should detect consumer failures
[ https://issues.apache.org/jira/browse/KAFKA-1334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1334: Summary: Coordinator should detect consumer failures (was: Add failure detection capability to the coordinator / consumer) Coordinator should detect consumer failures --- Key: KAFKA-1334 URL: https://issues.apache.org/jira/browse/KAFKA-1334 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Onur Karaman Attachments: KAFKA-1334.patch, KAFKA-1334_2015-04-11_22:47:27.patch, KAFKA-1334_2015-04-13_11:55:06.patch, KAFKA-1334_2015-04-13_11:58:53.patch, KAFKA-1334_2015-04-18_10:16:23.patch, KAFKA-1334_2015-04-18_12:16:39.patch, KAFKA-1334_2015-04-24_22:46:15.patch, KAFKA-1334_2015-04-25_15:21:25.patch, KAFKA-1334_2015-04-25_17:57:37.patch, KAFKA-1334_2015-05-05_10:50:00.patch, KAFKA-1334_2015-05-08_10:55:02.patch, KAFKA-1334_2015-05-14_00:38:39.patch 1) Add coordinator discovery and failure detection to the consumer. 2) Add failure detection capability to the coordinator when group management is used. This will not include any rebalancing logic, just the logic to detect consumer failures using session.timeout.ms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34397: Patch for KAFKA-1374
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34397/#review84545 --- Ship it! Assuming some integration tests will be added in the follow-up patches. - Guozhang Wang On May 20, 2015, 12:20 a.m., Joel Koshy wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34397/ --- (Updated May 20, 2015, 12:20 a.m.) Review request for kafka. Bugs: KAFKA-1374 https://issues.apache.org/jira/browse/KAFKA-1374 Repository: kafka Description --- Remove no-compression constraint for compacted topics Fix log cleaner integration test Incorporate edits from latest patch More minor edits Incorporate Guozhang's comments Diffs - core/src/main/scala/kafka/log/LogCleaner.scala abea8b251895a5cc0788c6e25b112a2935a3f631 core/src/main/scala/kafka/message/ByteBufferMessageSet.scala 9dfe914991aaf82162e5e300c587c794555d5fd0 core/src/test/scala/kafka/tools/TestLogCleaning.scala 844589427cb9337acd89a5239a98b811ee58118e core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 3b5aa9dc3b7ac5893c1d281ae1326be0e9ed8aad core/src/test/scala/unit/kafka/log/LogTest.scala 76d3bfd378f32fd2b216b3ebdec86e2070491924 Diff: https://reviews.apache.org/r/34397/diff/ Testing --- Thanks, Joel Koshy
Re: Review Request 34450: Fix KAFKA-2017; rebased
On May 20, 2015, 5:15 p.m., Onur Karaman wrote: I only did a brief skim. This optimization tries to switch consumers over to a new coordinator without a rebalance. From my understanding, the consumers would detect a coordinator failure, discover the new coordinator to work with, and try heartbeating that new coordinator withouth a rebalance. So it seems to me that putting the logic in handleJoinGroup isn't right, as the rebalance is what we're trying to avoid. The code should be in handleHeartbeat. It should lookup zk for the group info, add it to CoordinatorMetadata, and start up a DelayedHeartbeat for every consumer of that group. **More importantly: given that this is just an optimization, and we haven't even seen the performance hit without this, I think KAFKA-2017 should be very low priority.** The following are higher priority: 1. Getting the consumer to properly handle error codes of the join group and heartbeat responses. 2. Getting the consumer to detect coordinator failures and switch over to another coordinator (my KAFKA-1334 patch just had the coordinator detect consumer failures). A nice benefit of completing this first is that if we decide that the rebalances on coordinator failover are an actual issue, this would greatly facilitate testing any coordinator failover logic. Right now, it's unclear how this rb's logic can be tested. I added a ticket for 2: [KAFKA-2208](https://issues.apache.org/jira/browse/KAFKA-2208) - Onur --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/#review84539 --- On May 20, 2015, 4:13 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Comment Edited] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552582#comment-14552582 ] Guozhang Wang edited comment on KAFKA-2017 at 5/20/15 4:18 PM: --- This patch used the second ZK data structure proposed above to reduce #. ZK reads. 1. The consumer group registry path only contains the members list, not the assignment, which can always be calculated on-the-fly from the member registries. 2. The group registry will be updated every time when the prepare-rebalance phase has completed, and hence the new members has finalized; if the group has no members, it will be removed as well. 3. When a consumer group was first created, it is not immediately registered in ZK but only kept in coordinator's metadata cache; only when the new group has finished the first prepare-rebalance it will then be registered in ZK. 4. When a consumer group cannot be found in the metadata cache, the coordinator will always try to read it form ZK. Pinging [~onurkaraman], [~junrao] for reviews. was (Author: guozhang): This patch used the second ZK data structure proposed above to reduce #. ZK reads. 1. The consumer group registry path only contains the members list, not the assignment, which can always be calculated on-the-fly from the member registries. 2. The group registry will be updated every time when the prepare-rebalance phase has completed, and hence the new members has finalized; if the group has no members, it will be removed as well. 3. When a consumer group was first created, it is not immediately registered in ZK but only kept in coordinator's metadata cache; only when the new group has finished the first prepare-rebalance it will then be registered in ZK. 4. When a consumer group cannot be found in the metadata cache, the coordinator will always try to read it form ZK. Persist Coordinator State for Coordinator Failover -- Key: KAFKA-2017 URL: https://issues.apache.org/jira/browse/KAFKA-2017 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Onur Karaman Assignee: Guozhang Wang Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch When a coordinator fails, the group membership protocol tries to failover to a new coordinator without forcing all the consumers rejoin their groups. This is possible if the coordinator persists its state so that the state can be transferred during coordinator failover. This state consists of most of the information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34450: Fix KAFKA-2017; rebased
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Summary (updated) - Fix KAFKA-2017; rebased Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs (updated) - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552574#comment-14552574 ] Guozhang Wang commented on KAFKA-2017: -- Updated reviewboard https://reviews.apache.org/r/34450/diff/ against branch origin/trunk Persist Coordinator State for Coordinator Failover -- Key: KAFKA-2017 URL: https://issues.apache.org/jira/browse/KAFKA-2017 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Onur Karaman Assignee: Guozhang Wang Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch When a coordinator fails, the group membership protocol tries to failover to a new coordinator without forcing all the consumers rejoin their groups. This is possible if the coordinator persists its state so that the state can be transferred during coordinator failover. This state consists of most of the information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-2017: - Attachment: KAFKA-2017_2015-05-20_09:13:39.patch Persist Coordinator State for Coordinator Failover -- Key: KAFKA-2017 URL: https://issues.apache.org/jira/browse/KAFKA-2017 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Onur Karaman Assignee: Guozhang Wang Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch When a coordinator fails, the group membership protocol tries to failover to a new coordinator without forcing all the consumers rejoin their groups. This is possible if the coordinator persists its state so that the state can be transferred during coordinator failover. This state consists of most of the information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1333) Add consumer co-ordinator module to the server
[ https://issues.apache.org/jira/browse/KAFKA-1333?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang updated KAFKA-1333: - Resolution: Fixed Status: Resolved (was: Patch Available) Resolve this ticket as a result of KAFKA-1334. Add consumer co-ordinator module to the server -- Key: KAFKA-1333 URL: https://issues.apache.org/jira/browse/KAFKA-1333 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Guozhang Wang Attachments: KAFKA-1333_2015-01-31_17:40:51.patch, KAFKA-1333_2015-02-06_15:02:48.patch Scope of this JIRA is to just add a consumer co-ordinator module that do the following: 1) coordinator start-up, metadata initialization 2) simple join group handling (just updating metadata, no failure detection / rebalancing): this should be sufficient for consumers doing self offset / partition management. Offset manager will still run side-by-side with the coordinator in this JIRA, and we will merge it in KAFKA-1740. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-1334) Add failure detection capability to the coordinator / consumer
[ https://issues.apache.org/jira/browse/KAFKA-1334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-1334. -- Resolution: Fixed Resolve this ticket as a result of KAFKA-2160. Add failure detection capability to the coordinator / consumer -- Key: KAFKA-1334 URL: https://issues.apache.org/jira/browse/KAFKA-1334 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Onur Karaman Attachments: KAFKA-1334.patch, KAFKA-1334_2015-04-11_22:47:27.patch, KAFKA-1334_2015-04-13_11:55:06.patch, KAFKA-1334_2015-04-13_11:58:53.patch, KAFKA-1334_2015-04-18_10:16:23.patch, KAFKA-1334_2015-04-18_12:16:39.patch, KAFKA-1334_2015-04-24_22:46:15.patch, KAFKA-1334_2015-04-25_15:21:25.patch, KAFKA-1334_2015-04-25_17:57:37.patch, KAFKA-1334_2015-05-05_10:50:00.patch, KAFKA-1334_2015-05-08_10:55:02.patch, KAFKA-1334_2015-05-14_00:38:39.patch 1) Add coordinator discovery and failure detection to the consumer. 2) Add failure detection capability to the coordinator when group management is used. This will not include any rebalancing logic, just the logic to detect consumer failures using session.timeout.ms. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552582#comment-14552582 ] Guozhang Wang commented on KAFKA-2017: -- This patch used the second ZK data structure proposed above to reduce #. ZK reads. 1. The consumer group registry path only contains the members list, not the assignment, which can always be calculated on-the-fly from the member registries. 2. The group registry will be updated every time when the prepare-rebalance phase has completed, and hence the new members has finalized; if the group has no members, it will be removed as well. 3. When a consumer group was first created, it is not immediately registered in ZK but only kept in coordinator's metadata cache; only when the new group has finished the first prepare-rebalance it will then be registered in ZK. 4. When a consumer group cannot be found in the metadata cache, the coordinator will always try to read it form ZK. Persist Coordinator State for Coordinator Failover -- Key: KAFKA-2017 URL: https://issues.apache.org/jira/browse/KAFKA-2017 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Onur Karaman Assignee: Guozhang Wang Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch When a coordinator fails, the group membership protocol tries to failover to a new coordinator without forcing all the consumers rejoin their groups. This is possible if the coordinator persists its state so that the state can be transferred during coordinator failover. This state consists of most of the information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2209) Change client quotas dynamically using DynamicConfigManager
Aditya Auradkar created KAFKA-2209: -- Summary: Change client quotas dynamically using DynamicConfigManager Key: KAFKA-2209 URL: https://issues.apache.org/jira/browse/KAFKA-2209 Project: Kafka Issue Type: Sub-task Reporter: Aditya Auradkar Assignee: Aditya Auradkar https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1335) Add rebalancing logic to the coordinator / consumer
[ https://issues.apache.org/jira/browse/KAFKA-1335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552707#comment-14552707 ] Onur Karaman commented on KAFKA-1335: - Part 1 was handled by: https://reviews.apache.org/r/33088/ Add rebalancing logic to the coordinator / consumer --- Key: KAFKA-1335 URL: https://issues.apache.org/jira/browse/KAFKA-1335 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Guozhang Wang This implements the group management protocol. This will be a tricky and potentially large change since it will involve implementing the group management protocol, which include: 1) Adding the rebalance logic on the coordinator that can be triggered from membership change (either through failure detector or join group requests) and topic / partition ZK listener fires. 2) Adding the rebalance logic on the consumer upon topic change / error code from coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics
[ https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552712#comment-14552712 ] Guozhang Wang commented on KAFKA-1374: -- Committed to trunk. I think we need some integration tests before feeling confident about this issue being resolved, so leave it open just for now. LogCleaner (compaction) does not support compressed topics -- Key: KAFKA-1374 URL: https://issues.apache.org/jira/browse/KAFKA-1374 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1374.patch, KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, KAFKA-1374_2015-01-18_00:19:21.patch, KAFKA-1374_2015-05-18_22:55:48.patch, KAFKA-1374_2015-05-19_17:20:44.patch This is a known issue, but opening a ticket to track. If you try to compact a topic that has compressed messages you will run into various exceptions - typically because during iteration we advance the position based on the decompressed size of the message. I have a bunch of stack traces, but it should be straightforward to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1334) Coordinator should detect consumer failures
[ https://issues.apache.org/jira/browse/KAFKA-1334?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Onur Karaman updated KAFKA-1334: Description: Add failure detection capability to the coordinator when group management is used. (was: 1) Add coordinator discovery and failure detection to the consumer. 2) Add failure detection capability to the coordinator when group management is used. This will not include any rebalancing logic, just the logic to detect consumer failures using session.timeout.ms. ) Coordinator should detect consumer failures --- Key: KAFKA-1334 URL: https://issues.apache.org/jira/browse/KAFKA-1334 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Onur Karaman Attachments: KAFKA-1334.patch, KAFKA-1334_2015-04-11_22:47:27.patch, KAFKA-1334_2015-04-13_11:55:06.patch, KAFKA-1334_2015-04-13_11:58:53.patch, KAFKA-1334_2015-04-18_10:16:23.patch, KAFKA-1334_2015-04-18_12:16:39.patch, KAFKA-1334_2015-04-24_22:46:15.patch, KAFKA-1334_2015-04-25_15:21:25.patch, KAFKA-1334_2015-04-25_17:57:37.patch, KAFKA-1334_2015-05-05_10:50:00.patch, KAFKA-1334_2015-05-08_10:55:02.patch, KAFKA-1334_2015-05-14_00:38:39.patch Add failure detection capability to the coordinator when group management is used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-2201) Open file handle leak
[ https://issues.apache.org/jira/browse/KAFKA-2201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Albert Visagie updated KAFKA-2201: -- Comment: was deleted (was: Apologies, I forgot to add: the number of open file handles climbs linearly for over time until it reaches just over 6 after about 18 hours. The configured per process limit on my server is 65536. There are nowhere near that many files in /tmp/kafka-logs. Given that number of open files, I don't think changing the limits is the answer. Will try the latest JDK 1.7... ) Open file handle leak - Key: KAFKA-2201 URL: https://issues.apache.org/jira/browse/KAFKA-2201 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.1 Environment: Debian Linux 7, 64 bit Oracle JDK 1.7.0u40, 64-bit Reporter: Albert Visagie The kafka broker crashes with the following stack trace from the server.log roughly every 18 hours: [2015-05-19 07:39:22,924] FATAL [KafkaApi-0] Halting due to unrecoverable I/O error while handling produce request: (kafka.server.KafkaApis) kafka.common.KafkaStorageException: I/O exception in append to log 'nnn-1' at kafka.log.Log.append(Log.scala:266) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inReadLock(Utils.scala:541) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282) at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:888) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:286) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264) at kafka.log.Log.roll(Log.scala:563) at kafka.log.Log.maybeRoll(Log.scala:539) at kafka.log.Log.append(Log.scala:306) ... 21 more Caused by: java.lang.OutOfMemoryError: Map failed at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:885) ... 33 more The Kafka broker's open filehandles as seen by lsof | grep pid | wc -l grows steadily as it runs. Under our load it lasts about 18 hours before crashing with the stack trace above. We were experimenting with settings under Log Retention Policy in server.properties: log.retention.hours=168 log.retention.bytes=107374182 log.segment.bytes=1073741 log.retention.check.interval.ms=3000 The result is that the broker rolls over segments quite rapidly. We don't have to run it that way of course. We are running only one broker at the moment. lsof shows many open files without size and absent from ls in the log directory with the suffix .deleted This is kafka 0.8.2.1 with scala 2.10.4 as downloaded from the website last week. -- This
[jira] [Commented] (KAFKA-2201) Open file handle leak
[ https://issues.apache.org/jira/browse/KAFKA-2201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552565#comment-14552565 ] Albert Visagie commented on KAFKA-2201: --- Apologies, I forgot to add: the number of open file handles climbs linearly for over time until it reaches just over 6 after about 18 hours. The configured per process limit on my server is 65536. There are nowhere near that many files in /tmp/kafka-logs. Given that number of open files, I don't think changing the limits is the answer. Will try the latest JDK 1.7... Open file handle leak - Key: KAFKA-2201 URL: https://issues.apache.org/jira/browse/KAFKA-2201 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.1 Environment: Debian Linux 7, 64 bit Oracle JDK 1.7.0u40, 64-bit Reporter: Albert Visagie The kafka broker crashes with the following stack trace from the server.log roughly every 18 hours: [2015-05-19 07:39:22,924] FATAL [KafkaApi-0] Halting due to unrecoverable I/O error while handling produce request: (kafka.server.KafkaApis) kafka.common.KafkaStorageException: I/O exception in append to log 'nnn-1' at kafka.log.Log.append(Log.scala:266) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inReadLock(Utils.scala:541) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282) at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:888) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:286) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264) at kafka.log.Log.roll(Log.scala:563) at kafka.log.Log.maybeRoll(Log.scala:539) at kafka.log.Log.append(Log.scala:306) ... 21 more Caused by: java.lang.OutOfMemoryError: Map failed at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:885) ... 33 more The Kafka broker's open filehandles as seen by lsof | grep pid | wc -l grows steadily as it runs. Under our load it lasts about 18 hours before crashing with the stack trace above. We were experimenting with settings under Log Retention Policy in server.properties: log.retention.hours=168 log.retention.bytes=107374182 log.segment.bytes=1073741 log.retention.check.interval.ms=3000 The result is that the broker rolls over segments quite rapidly. We don't have to run it that way of course. We are running only one broker at the moment. lsof shows many open files without size and absent from ls in the log directory with the suffix .deleted This is kafka 0.8.2.1 with scala 2.10.4 as downloaded from the website
[jira] [Commented] (KAFKA-2201) Open file handle leak
[ https://issues.apache.org/jira/browse/KAFKA-2201?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552564#comment-14552564 ] Albert Visagie commented on KAFKA-2201: --- Apologies, I forgot to add: the number of open file handles climbs linearly for over time until it reaches just over 6 after about 18 hours. The configured per process limit on my server is 65536. There are nowhere near that many files in /tmp/kafka-logs. Given that number of open files, I don't think changing the limits is the answer. Will try the latest JDK 1.7... Open file handle leak - Key: KAFKA-2201 URL: https://issues.apache.org/jira/browse/KAFKA-2201 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.8.2.1 Environment: Debian Linux 7, 64 bit Oracle JDK 1.7.0u40, 64-bit Reporter: Albert Visagie The kafka broker crashes with the following stack trace from the server.log roughly every 18 hours: [2015-05-19 07:39:22,924] FATAL [KafkaApi-0] Halting due to unrecoverable I/O error while handling produce request: (kafka.server.KafkaApis) kafka.common.KafkaStorageException: I/O exception in append to log 'nnn-1' at kafka.log.Log.append(Log.scala:266) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:379) at kafka.cluster.Partition$$anonfun$appendMessagesToLeader$1.apply(Partition.scala:365) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.utils.Utils$.inReadLock(Utils.scala:541) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:365) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:291) at kafka.server.KafkaApis$$anonfun$appendToLocalLog$2.apply(KafkaApis.scala:282) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.KafkaApis.appendToLocalLog(KafkaApis.scala:282) at kafka.server.KafkaApis.handleProducerOrOffsetCommitRequest(KafkaApis.scala:204) at kafka.server.KafkaApis.handle(KafkaApis.scala:59) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:59) at java.lang.Thread.run(Thread.java:724) Caused by: java.io.IOException: Map failed at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:888) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:286) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:276) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:276) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.scala:265) at kafka.utils.Utils$.inLock(Utils.scala:535) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:264) at kafka.log.Log.roll(Log.scala:563) at kafka.log.Log.maybeRoll(Log.scala:539) at kafka.log.Log.append(Log.scala:306) ... 21 more Caused by: java.lang.OutOfMemoryError: Map failed at sun.nio.ch.FileChannelImpl.map0(Native Method) at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:885) ... 33 more The Kafka broker's open filehandles as seen by lsof | grep pid | wc -l grows steadily as it runs. Under our load it lasts about 18 hours before crashing with the stack trace above. We were experimenting with settings under Log Retention Policy in server.properties: log.retention.hours=168 log.retention.bytes=107374182 log.segment.bytes=1073741 log.retention.check.interval.ms=3000 The result is that the broker rolls over segments quite rapidly. We don't have to run it that way of course. We are running only one broker at the moment. lsof shows many open files without size and absent from ls in the log directory with the suffix .deleted This is kafka 0.8.2.1 with scala 2.10.4 as downloaded from the website
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting. The second setting is to bound the amount of time spent blocking on send(). This is really an implementation detail, but one that people are complaining about enough that it seems worthwhile to provide control over it (and fixing it would just make that setting superfluous, not break anything). Exposing a lot more settings also exposes a lot about the implementation and makes it harder to improve the implementation in the future, but I don't think we have listed good use cases for setting each of them individually. Why would the user specifically care about how much time the request spends in the accumulator vs. some other component (assuming they have the overall timeout)? Same for requests in flight, as long as I have that client side timeout? And if they care about what component is the bottleneck, could that be better exposed by the exceptions that are returned rather than a ton of different settings? Agreed with the above. I'm also extremely wary of configs that are inherently unintuitive, or can interact to yield unintuitive behavior. OTOH I think it is okay if a config is categorized as advanced or if it requires deeper knowledge of the internals of the producer (or the configured system in general). i.e., as long as we think long and hard and agree on necessity (driven by clear use cases) before adding such configs. We should also consider how we can simplify or even eliminate existing configs. Re: requests in flight may be a good example: Becket had given a valid use-case i.e., support strict ordering. Maybe we can replace it with a enable.strict.ordering config which is clearer in intent and would internally ensure only one in-flight request per partition and default to a fixed in-flight requests (say, five or 10) if set to false. If we implement idempotence then we won't even need that. On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I updated what I think int KIP wiki. Just a short summary here. Because we need timeout for: 1. Send() 2. Batches in accumulator 3. Requests in flight. That means we need to have at least three configurations if we do not reuse configurations. I think we probably want to also separate the configurations for exception handling and SLA purposes as well. My understanding of the configurations we are discussing here is they are for exception handling but not for SLA purposes. It looks to me that exception handling is more component oriented while SLA is more of systematic tuning. What you suggested sounds more like to set configurations to meet a user defined SLA. I am not sure if this is the things we want to do here. Thanks. Jiangjie (Becket) Qin On 5/19/15, 5:42 PM, Jay Kreps jay.kr...@gmail.com wrote: Yeah I think linger.ms remains separate, setting that is a performance optimization rather than failure handling thing. We should ideally sanity check this, though, in my proposal, since if they set linger.ms request.timeout then that won't work. It's true that in my proposal that the actual replication timeout we set on the request would be non-deterministic. However the flip side of that argument is that in the existing proposal the actual time until an acknowledgement is non-deterministic, right? So I think the argument I am trying to construct is that the two things the user cares about are the time to block and the time to ack and any other timeout we use internally is basically an implementation detail of ensuring this. Your point about the difference between batches and requests is a good one. I hadn't thought of that. So to make my proposal work we would need to do something like base the request time off the oldest batch. Let me think about the implications of that, it's definitely a problem. -Jay On Tue, May 19, 2015 at 12:42 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hey Jay, That is also a viable solution. I think
[jira] [Commented] (KAFKA-2017) Persist Coordinator State for Coordinator Failover
[ https://issues.apache.org/jira/browse/KAFKA-2017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552703#comment-14552703 ] Onur Karaman commented on KAFKA-2017: - I left a review. Persist Coordinator State for Coordinator Failover -- Key: KAFKA-2017 URL: https://issues.apache.org/jira/browse/KAFKA-2017 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Onur Karaman Assignee: Guozhang Wang Attachments: KAFKA-2017.patch, KAFKA-2017_2015-05-20_09:13:39.patch When a coordinator fails, the group membership protocol tries to failover to a new coordinator without forcing all the consumers rejoin their groups. This is possible if the coordinator persists its state so that the state can be transferred during coordinator failover. This state consists of most of the information in GroupRegistry and ConsumerRegistry. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1334) Coordinator should detect consumer failures
[ https://issues.apache.org/jira/browse/KAFKA-1334?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552701#comment-14552701 ] Onur Karaman commented on KAFKA-1334: - This ticket originally was two things: 1. coordinator detect consumer failure (now KAFKA-1334) 2 consumer detect coordinator failure (now KAFKA-2208) Coordinator should detect consumer failures --- Key: KAFKA-1334 URL: https://issues.apache.org/jira/browse/KAFKA-1334 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Onur Karaman Attachments: KAFKA-1334.patch, KAFKA-1334_2015-04-11_22:47:27.patch, KAFKA-1334_2015-04-13_11:55:06.patch, KAFKA-1334_2015-04-13_11:58:53.patch, KAFKA-1334_2015-04-18_10:16:23.patch, KAFKA-1334_2015-04-18_12:16:39.patch, KAFKA-1334_2015-04-24_22:46:15.patch, KAFKA-1334_2015-04-25_15:21:25.patch, KAFKA-1334_2015-04-25_17:57:37.patch, KAFKA-1334_2015-05-05_10:50:00.patch, KAFKA-1334_2015-05-08_10:55:02.patch, KAFKA-1334_2015-05-14_00:38:39.patch Add failure detection capability to the coordinator when group management is used. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2208) Consumer should detect coordinator failures
Onur Karaman created KAFKA-2208: --- Summary: Consumer should detect coordinator failures Key: KAFKA-2208 URL: https://issues.apache.org/jira/browse/KAFKA-2208 Project: Kafka Issue Type: Sub-task Reporter: Onur Karaman Assignee: Guozhang Wang Add coordinator discovery and failure detection to the consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33204: Patch for KAFKA-1646 add test cases
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33204/#review84554 --- Ship it! This looks good to be, but it would be good to get a second reviewer since this is fairly critical code. - Jay Kreps On May 11, 2015, 10 a.m., Honghai Chen wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33204/ --- (Updated May 11, 2015, 10 a.m.) Review request for kafka. Bugs: KAFKA-1646 https://issues.apache.org/jira/browse/KAFKA-1646 Repository: kafka Description --- Kafka 1646 fix add test cases Diffs - core/src/main/scala/kafka/log/FileMessageSet.scala 2522604bd985c513527fa0c863a7df677ff7a503 core/src/main/scala/kafka/log/Log.scala 84e7b8fe9dd014884b60c4fbe13c835cf02a40e4 core/src/main/scala/kafka/log/LogConfig.scala a907da09e1ccede3b446459225e407cd1ae6d8b3 core/src/main/scala/kafka/log/LogSegment.scala ed039539ac18ea4d65144073915cf112f7374631 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/main/scala/kafka/server/KafkaServer.scala b7d2a2842e17411a823b93bdedc84657cbd62be1 core/src/main/scala/kafka/utils/CoreUtils.scala d0a8fa701564b4c13b3cd6501e1b6218d77e8e06 core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala cec1caecc51507ae339ebf8f3b8a028b12a1a056 core/src/test/scala/unit/kafka/log/LogSegmentTest.scala 03fb3512c4a4450eac83d4cd4b0919baeaa22942 Diff: https://reviews.apache.org/r/33204/diff/ Testing --- Thanks, Honghai Chen
[jira] [Commented] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552777#comment-14552777 ] Jay Kreps commented on KAFKA-1646: -- This looks good to me but it would be good to get a second reviewer on this since it is fairly critical code. [~junrao] any chance you could take a look too? Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150511_AddTestcases.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
Parth Brahmbhatt created KAFKA-2210: --- Summary: KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP-19 Add a request timeout to NetworkClient
Hey Ewen, I agree with you that we should avoid any unnecessary configurations exposed to user. And the necessity is defined by use case. I also agree that the configurations should be named from users’ perspective and comply with the intuition - for example, like what Joel said, something like enable.strict.order. That is exactly why I think it makes sense to just use metadata timeout to expire the batches in accumulator without adding another configuration. I still have some concerns over having only one timeout configuration after messages are appended to accumulator. IMO, this will cause problem in some scenarios. What do we expect user to set that value? If a batch stayed in accumulator for a while and maybe only 1 second is left before timeout, are we going to send it or not? From what I can see, network request timeout is to make sure we have enough buffer to allow a broker respond a little bit slow but also be able to avoid sticking to a dead broker for too long. It should be independent of other settings. If we agree on this then that indicates we need to have a *stand-alone* network.request.timeout.ms If we really want to give user some kind of guarantee on blocking time of send() and minimize the configurations. I feel it makes more sense to have a timeout including time blocked on send() and time spent in accumulator. But this will again have a similar question what if send() blocked for some time and there is only 1 second left when we put the message into accumulator? Also it is not clear to me how would one configuration solve the timeouts we have are on different entities, specifically: 1. In send() - messages 2. In accumulator - batches 3. In NetworkClient - requests I think there are two ways to think about the configurations: 1. Set configuration by steps, and let user to specify how long they want to wait for each steps - I think this is are you and Jay are opposing to. And I agree. 2. Tell user what things are required for a message to be sent. And let them set on how long they are willing to wait on each of them. So we have metadata, buffer pool, request timeout, replication timeout. I prefer the second way. In this case we can have: A. Metadata.timeout.ms - used in send() and accumulator B. Blocking.on.buffer.full.ms - used in send() C. Network.request.timeout.ms - used in NetworkClient D. Replication.timeout.ms - used in broker A send() will block for at most A + B, a send-to-response time can be at most A + B + C + D. It is very clear to user what kind of things they are configuring. Arguably, we are exposing internal thing to user, but it is better than providing some ambiguity to user and later on we have to explain the internal details to user separately. Thanks. Jiangjie (Becket) Qin On 5/19/15, 9:53 PM, Ewen Cheslack-Postava e...@confluent.io wrote: The fact that I understand the producer internals and am still struggling to understand the implications of the different settings, how I would set them, and how they potentially interact such that I could set invalid combinations seems like a red flag to me... Being able to say I want produce requests to timeout in 5s shouldn't require adjusting 3 or 4 configs if the defaults would normally timeout out in something like 30s. Setting aside compatibility issues and focusing on the best set of configs, I agree with Jay that there are two things I actually want out of the API. The key thing is a per-request timeout, which should be enforced client side. I would just expect this to follow the request through any internals so it can be enforced no matter where in the pipeline the request is. Within each component in the pipeline we might have to compute how much time we have left for the request in order to create a timeout within that setting. The second setting is to bound the amount of time spent blocking on send(). This is really an implementation detail, but one that people are complaining about enough that it seems worthwhile to provide control over it (and fixing it would just make that setting superfluous, not break anything). Exposing a lot more settings also exposes a lot about the implementation and makes it harder to improve the implementation in the future, but I don't think we have listed good use cases for setting each of them individually. Why would the user specifically care about how much time the request spends in the accumulator vs. some other component (assuming they have the overall timeout)? Same for requests in flight, as long as I have that client side timeout? And if they care about what component is the bottleneck, could that be better exposed by the exceptions that are returned rather than a ton of different settings? On Tue, May 19, 2015 at 7:13 PM, Jiangjie Qin j...@linkedin.com.invalid wrote: Hi Jay, I updated what I think int KIP wiki. Just a short summary here. Because we need timeout for: 1. Send() 2. Batches in accumulator 3. Requests in flight. That means we need to have at
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
I actually feel many [VOTE] threads eventually become [DISCUSS] as people just put tons of comments there :) On 5/20/15, 11:52 AM, Jay Kreps jay.kr...@gmail.com wrote: Makes sense. Honghai, want to do a [VOTE] thread just so everything is official? -Jay On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com wrote: For simple discussions, I completely agree. For those threads where there are few votes, and then more discussion, and then KIP changes few times... separate thread will help keep things clear for both voters and anyone who will try to figure out what happened in the future. On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec =
[jira] [Commented] (KAFKA-2211) KafkaAuthorizer: Add simpleACLAuthorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2211?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553012#comment-14553012 ] Parth Brahmbhatt commented on KAFKA-2211: - Created reviewboard https://reviews.apache.org/r/34493/diff/ against branch az KafkaAuthorizer: Add simpleACLAuthorizer implementation. Key: KAFKA-2211 URL: https://issues.apache.org/jira/browse/KAFKA-2211 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2211.patch Subtask-2 for Kafka-1688. Please see KIP-11 to get details on out of box SimpleACLAuthorizer implementation https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2211) KafkaAuthorizer: Add simpleACLAuthorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2211: Status: Patch Available (was: Open) KafkaAuthorizer: Add simpleACLAuthorizer implementation. Key: KAFKA-2211 URL: https://issues.apache.org/jira/browse/KAFKA-2211 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2211.patch Subtask-2 for Kafka-1688. Please see KIP-11 to get details on out of box SimpleACLAuthorizer implementation https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2211) KafkaAuthorizer: Add simpleACLAuthorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2211: Attachment: KAFKA-2211.patch KafkaAuthorizer: Add simpleACLAuthorizer implementation. Key: KAFKA-2211 URL: https://issues.apache.org/jira/browse/KAFKA-2211 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2211.patch Subtask-2 for Kafka-1688. Please see KIP-11 to get details on out of box SimpleACLAuthorizer implementation https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2212) KafkaAuthorizer: Add CLI for Acl management.
[ https://issues.apache.org/jira/browse/KAFKA-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553014#comment-14553014 ] Parth Brahmbhatt commented on KAFKA-2212: - Created reviewboard https://reviews.apache.org/r/34494/diff/ against branch az-impl KafkaAuthorizer: Add CLI for Acl management. - Key: KAFKA-2212 URL: https://issues.apache.org/jira/browse/KAFKA-2212 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2212.patch This is subtask-3 for Kafka-1688. Please see KIP-11 for details on CLI for Authorizer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 34494: Patch for KAFKA-2212
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34494/ --- Review request for kafka. Bugs: KAFKA-2212 https://issues.apache.org/jira/browse/KAFKA-2212 Repository: kafka Description --- KAFKA-2212: Add CLI for acl management of authorizer. Diffs - core/src/main/scala/kafka/admin/AclCommand.scala PRE-CREATION core/src/test/scala/unit/kafka/admin/AclCommandTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/34494/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Updated] (KAFKA-2212) KafkaAuthorizer: Add CLI for Acl management.
[ https://issues.apache.org/jira/browse/KAFKA-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2212: Attachment: KAFKA-2212.patch KafkaAuthorizer: Add CLI for Acl management. - Key: KAFKA-2212 URL: https://issues.apache.org/jira/browse/KAFKA-2212 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2212.patch This is subtask-3 for Kafka-1688. Please see KIP-11 for details on CLI for Authorizer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2212) KafkaAuthorizer: Add CLI for Acl management.
[ https://issues.apache.org/jira/browse/KAFKA-2212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2212: Status: Patch Available (was: Open) KafkaAuthorizer: Add CLI for Acl management. - Key: KAFKA-2212 URL: https://issues.apache.org/jira/browse/KAFKA-2212 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2212.patch This is subtask-3 for Kafka-1688. Please see KIP-11 for details on CLI for Authorizer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Vote] KIP-11 Authorization design for kafka security
This vote is now Closed with 4 binding +1s and 4 non binding +1s. Thanks Parth On 5/20/15, 12:04 PM, Joel Koshy jjkosh...@gmail.com wrote: +1 On Fri, May 15, 2015 at 04:18:49PM +, Parth Brahmbhatt wrote: Hi, Opening the voting thread for KIP-11. Link to the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+ Interface Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688 Thanks Parth
[jira] [Reopened] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma reopened KAFKA-2187: I was just testing the merge script and it seems to work great. :) Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2213) Log cleaner should write compacted messages using configured compression type
Joel Koshy created KAFKA-2213: - Summary: Log cleaner should write compacted messages using configured compression type Key: KAFKA-2213 URL: https://issues.apache.org/jira/browse/KAFKA-2213 Project: Kafka Issue Type: Bug Reporter: Joel Koshy In KAFKA-1374 the log cleaner was improved to handle compressed messages. There were a couple of follow-ups from that: * We write compacted messages using the original compression type in the compressed message-set. We should instead append all retained messages with the configured broker compression type of the topic. * While compressing messages we should ideally do some batching before compression. * Investigate the use of the client compressor. (See the discussion in the RBs for KAFKA-1374) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
On May 15, 2015, 8:26 p.m., Michael Herstine wrote: clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java, lines 137-139 https://reviews.apache.org/r/33620/diff/5/?file=957064#file957064line137 This is interesting; I don't see a corresponding `createSSLSocketFactory`-- did I miss it? Besides, I thought Kafka used NIO (in which case you wouldn't be using `SSLServerSocketFactory`, which IIRC is blocking. Who calls this? This method is used for tests and I moved it under tests. On May 15, 2015, 8:26 p.m., Michael Herstine wrote: clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java, lines 56-59 https://reviews.apache.org/r/33620/diff/5/?file=957065#file957065line56 Do you want to make your nomenclature more uniform? The network buffers are referred to in terms of in out while the application buffers are referred to as read write. On a related note, do you need read write for each? At any given point in time, you're doing one or the other, so why not just have a network buffer an application buffer? Even though we do one op at any given time we still want two buffer as we might not have read the entire buffer in op invocation and we need to keep this data around. Having two buffers for read and write makes it easier. On May 15, 2015, 8:26 p.m., Michael Herstine wrote: clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java, line 76 https://reviews.apache.org/r/33620/diff/5/?file=957065#file957065line76 I'm confused on the difference between `startHandshake` `handshake` (inherited from interface `TransportLayer`). Sriharsha Chintalapani wrote: startHandshake is more of a prepare method for handshake process and its only SSLTransportLayer and I'll make that as a private method. handshake method is for general TransportLayer handshake . As indicated in comments for PlainTextChannel its a non-op and for SSL its implemented as non-blocking ssl handshake. I made startHandshake to private and its prepare method when the transportLayer gets created. If you see the naming of the method confusing please re-open. - Sriharsha --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/#review83807 --- On May 20, 2015, 9:54 p.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated May 20, 2015, 9:54 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. Diffs - build.gradle cd2aa838fd53e8124f308979b1d70efe0c5725a6 checkstyle/checkstyle.xml a215ff36e9252879f1e0be5a86fef9a875bb8f38 checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 8e336a3aa96c73f52beaeb56b931baf4b026cf21 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 187d0004c8c46b6664ddaffecc6166d4b47351e5 clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java c4fa058692f50abb4f47bd344119d805c60123f5 clients/src/main/java/org/apache/kafka/common/config/SecurityConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Channel.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION
[jira] [Assigned] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-2168: -- Assignee: Jason Gustafson (was: Ewen Cheslack-Postava) New consumer poll() can block other calls like position(), commit(), and close() indefinitely - Key: KAFKA-2168 URL: https://issues.apache.org/jira/browse/KAFKA-2168 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Jason Gustafson The new consumer is currently using very coarse-grained synchronization. For most methods this isn't a problem since they finish quickly once the lock is acquired, but poll() might run for a long time (and commonly will since polling with long timeouts is a normal use case). This means any operations invoked from another thread may block until the poll() call completes. Some example use cases where this can be a problem: * A shutdown hook is registered to trigger shutdown and invokes close(). It gets invoked from another thread and blocks indefinitely. * User wants to manage offset commit themselves in a background thread. If the commit policy is not purely time based, it's not currently possibly to make sure the call to commit() will be processed promptly. Two possible solutions to this: 1. Make sure a lock is not held during the actual select call. Since we have multiple layers (KafkaConsumer - NetworkClient - Selector - nio Selector) this is probably hard to make work cleanly since locking is currently only performed at the KafkaConsumer level and we'd want it unlocked around a single line of code in Selector. 2. Wake up the selector before synchronizing for certain operations. This would require some additional coordination to make sure the caller of wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() thread being woken up and then promptly reacquiring the lock with a subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 34492: Patch for KAFKA-2210
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34492/ --- Review request for kafka. Bugs: KAFKA-2210 https://issues.apache.org/jira/browse/KAFKA-2210 Repository: kafka Description --- KAFKA-2210: Kafka authorizer public entities and changes to KafkaAPI and KafkaServer to allow custom authorizer implementation. Diffs - core/src/main/scala/kafka/common/AuthorizationException.scala PRE-CREATION core/src/main/scala/kafka/common/ErrorMapping.scala c75c68589681b2c9d6eba2b440ce5e58cddf6370 core/src/main/scala/kafka/security/auth/Acl.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Authorizer.scala PRE-CREATION core/src/main/scala/kafka/security/auth/KafkaPrincipal.scala PRE-CREATION core/src/main/scala/kafka/security/auth/Operation.java PRE-CREATION core/src/main/scala/kafka/security/auth/PermissionType.java PRE-CREATION core/src/main/scala/kafka/security/auth/Resource.scala PRE-CREATION core/src/main/scala/kafka/security/auth/ResourceType.java PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala 387e387998fc3a6c9cb585dab02b5f77b0381fbf core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/main/scala/kafka/server/KafkaServer.scala ea6d165d8e5c3146d2c65e8ad1a513308334bf6f core/src/test/resources/acl.json PRE-CREATION core/src/test/scala/unit/kafka/security/auth/AclTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/KafkaPrincipalTest.scala PRE-CREATION core/src/test/scala/unit/kafka/security/auth/ResourceTest.scala PRE-CREATION core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34492/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2210: Attachment: KAFKA-2210.patch KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 34493: Patch for KAFKA-2211
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34493/ --- Review request for kafka. Bugs: KAFKA-2211 https://issues.apache.org/jira/browse/KAFKA-2211 Repository: kafka Description --- KAFKA-2211: Out of box implementation for authorizer interface. Diffs - core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala PRE-CREATION core/src/test/resources/authorizer-config.properties PRE-CREATION core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/34493/diff/ Testing --- Thanks, Parth Brahmbhatt
[jira] [Commented] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553010#comment-14553010 ] Parth Brahmbhatt commented on KAFKA-2210: - Created reviewboard https://reviews.apache.org/r/34492/diff/ against branch origin/trunk KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2210) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation.
[ https://issues.apache.org/jira/browse/KAFKA-2210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Parth Brahmbhatt updated KAFKA-2210: Status: Patch Available (was: Open) KafkaAuthorizer: Add all public entities, config changes and changes to KafkaAPI and kafkaServer to allow pluggable authorizer implementation. -- Key: KAFKA-2210 URL: https://issues.apache.org/jira/browse/KAFKA-2210 Project: Kafka Issue Type: Sub-task Components: security Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Attachments: KAFKA-2210.patch This is the first subtask for Kafka-1688. As Part of this jira we intend to agree on all the public entities, configs and changes to existing kafka classes to allow pluggable authorizer implementation. Please see KIP-11 https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface for detailed design. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33620: Patch for KAFKA-1690
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33620/ --- (Updated May 20, 2015, 9:54 p.m.) Review request for kafka. Bugs: KAFKA-1690 https://issues.apache.org/jira/browse/KAFKA-1690 Repository: kafka Description (updated) --- KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. KAFKA-1690. new java producer needs ssl support as a client. SSLFactory tests. KAFKA-1690. new java producer needs ssl support as a client. Added PrincipalBuilder. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. KAFKA-1690. new java producer needs ssl support as a client. Addressing reviews. Diffs (updated) - build.gradle cd2aa838fd53e8124f308979b1d70efe0c5725a6 checkstyle/checkstyle.xml a215ff36e9252879f1e0be5a86fef9a875bb8f38 checkstyle/import-control.xml f2e6cec267e67ce8e261341e373718e14a8e8e03 clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 0d68bf1e1e90fe9d5d4397ddf817b9a9af8d9f7a clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java cf32e4e7c40738fe6d8adc36ae0cfad459ac5b0b clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 8e336a3aa96c73f52beaeb56b931baf4b026cf21 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 187d0004c8c46b6664ddaffecc6166d4b47351e5 clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java c4fa058692f50abb4f47bd344119d805c60123f5 clients/src/main/java/org/apache/kafka/common/config/SecurityConfigs.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Authenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Channel.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/DefaultAuthenticator.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/PlainTextChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/PlainTextTransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLChannelBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLFactory.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/SSLTransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/network/Selectable.java b5f8d83e89f9026dc0853e5f92c00b2d7f043e22 clients/src/main/java/org/apache/kafka/common/network/Selector.java 57de0585e5e9a53eb9dcd99cac1ab3eb2086a302 clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/SecurityProtocol.java dab1a94dd29563688b6ecf4eeb0e180b06049d3f clients/src/main/java/org/apache/kafka/common/security/auth/DefaultPrincipalBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipal.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/security/auth/PrincipalBuilder.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/utils/Utils.java f73eedb030987f018d8446bb1dcd98d19fa97331 clients/src/test/java/org/apache/kafka/common/network/EchoServer.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/network/SSLFactoryTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/network/SSLSelectorTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java d5b306b026e788b4e5479f3419805aa49ae889f3 clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 2ebe3c21f611dc133a2dbb8c7dfb0845f8c21498 clients/src/test/java/org/apache/kafka/test/TestSSLUtils.java PRE-CREATION Diff: https://reviews.apache.org/r/33620/diff/ Testing --- Thanks, Sriharsha Chintalapani
[jira] [Updated] (KAFKA-1690) new java producer needs ssl support as a client
[ https://issues.apache.org/jira/browse/KAFKA-1690?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sriharsha Chintalapani updated KAFKA-1690: -- Attachment: KAFKA-1690_2015-05-20_14:54:35.patch new java producer needs ssl support as a client --- Key: KAFKA-1690 URL: https://issues.apache.org/jira/browse/KAFKA-1690 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Sriharsha Chintalapani Fix For: 0.8.3 Attachments: KAFKA-1690.patch, KAFKA-1690.patch, KAFKA-1690_2015-05-10_23:20:30.patch, KAFKA-1690_2015-05-10_23:31:42.patch, KAFKA-1690_2015-05-11_16:09:36.patch, KAFKA-1690_2015-05-12_16:20:08.patch, KAFKA-1690_2015-05-15_07:18:21.patch, KAFKA-1690_2015-05-20_14:54:35.patch -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1335) Add rebalancing logic to the coordinator / consumer
[ https://issues.apache.org/jira/browse/KAFKA-1335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553213#comment-14553213 ] Jason Gustafson commented on KAFKA-1335: Hey [~onurkaraman], I'm looking at this issue. Just wanted to clarify that part 2 is what is referred to above: bq. 2) Adding the rebalance logic on the consumer upon topic change / error code from coordinator. I can pick this up if you haven't begun working on it yet. Add rebalancing logic to the coordinator / consumer --- Key: KAFKA-1335 URL: https://issues.apache.org/jira/browse/KAFKA-1335 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Guozhang Wang This implements the group management protocol. This will be a tricky and potentially large change since it will involve implementing the group management protocol, which include: 1) Adding the rebalance logic on the coordinator that can be triggered from membership change (either through failure detector or join group requests) and topic / partition ZK listener fires. 2) Adding the rebalance logic on the consumer upon topic change / error code from coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1646) Improve consumer read performance for Windows
[ https://issues.apache.org/jira/browse/KAFKA-1646?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-1646: - Reviewer: Jun Rao Improve consumer read performance for Windows - Key: KAFKA-1646 URL: https://issues.apache.org/jira/browse/KAFKA-1646 Project: Kafka Issue Type: Improvement Components: log Affects Versions: 0.8.1.1 Environment: Windows Reporter: xueqiang wang Assignee: xueqiang wang Labels: newbie, patch Attachments: Improve consumer read performance for Windows.patch, KAFKA-1646-truncate-off-trailing-zeros-on-broker-restart-if-bro.patch, KAFKA-1646_20141216_163008.patch, KAFKA-1646_20150306_005526.patch, KAFKA-1646_20150511_AddTestcases.patch This patch is for Window platform only. In Windows platform, if there are more than one replicas writing to disk, the segment log files will not be consistent in disk and then consumer reading performance will be dropped down greatly. This fix allocates more disk spaces when rolling a new segment, and then it will improve the consumer reading performance in NTFS file system. This patch doesn't affect file allocation of other filesystems, for it only adds statements like 'if(Os.iswindow)' or adds methods used on Windows. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
For simple discussions, I completely agree. For those threads where there are few votes, and then more discussion, and then KIP changes few times... separate thread will help keep things clear for both voters and anyone who will try to figure out what happened in the future. On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552918#comment-14552918 ] Jun Rao commented on KAFKA-2147: [~csallsop...@gmail.com], we probably won't patch 0.8.1.1 since it's a bit old. The issue reported in this jira is actually already fixed in trunk. So, purgatory should behave better in the next release. Have you also tried lowering fetch.purgatory.purge.interval.requests? Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Jun Rao Attachments: KAFKA-2147.patch, KAFKA-2147_2015-05-14_09:41:56.patch, KAFKA-2147_2015-05-15_16:14:44.patch, purgatory.log, purgatory.log.gz, watch-lists.log Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with regular data
[jira] [Created] (KAFKA-2211) KafkaAuthorizer: Add simpleACLAuthorizer implementation.
Parth Brahmbhatt created KAFKA-2211: --- Summary: KafkaAuthorizer: Add simpleACLAuthorizer implementation. Key: KAFKA-2211 URL: https://issues.apache.org/jira/browse/KAFKA-2211 Project: Kafka Issue Type: Sub-task Reporter: Parth Brahmbhatt Assignee: Parth Brahmbhatt Subtask-2 for Kafka-1688. Please see KIP-11 to get details on out of box SimpleACLAuthorizer implementation https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [Vote] KIP-11 Authorization design for kafka security
+1 On Fri, May 15, 2015 at 04:18:49PM +, Parth Brahmbhatt wrote: Hi, Opening the voting thread for KIP-11. Link to the KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface Link to Jira: https://issues.apache.org/jira/browse/KAFKA-1688 Thanks Parth
Re: Review Request 34450: Fix KAFKA-2017; rebased
On May 20, 2015, 5:15 p.m., Onur Karaman wrote: I only did a brief skim. This optimization tries to switch consumers over to a new coordinator without a rebalance. From my understanding, the consumers would detect a coordinator failure, discover the new coordinator to work with, and try heartbeating that new coordinator withouth a rebalance. So it seems to me that putting the logic in handleJoinGroup isn't right, as the rebalance is what we're trying to avoid. The code should be in handleHeartbeat. It should lookup zk for the group info, add it to CoordinatorMetadata, and start up a DelayedHeartbeat for every consumer of that group. **More importantly: given that this is just an optimization, and we haven't even seen the performance hit without this, I think KAFKA-2017 should be very low priority.** The following are higher priority: 1. Getting the consumer to properly handle error codes of the join group and heartbeat responses. 2. Getting the consumer to detect coordinator failures and switch over to another coordinator (my KAFKA-1334 patch just had the coordinator detect consumer failures). A nice benefit of completing this first is that if we decide that the rebalances on coordinator failover are an actual issue, this would greatly facilitate testing any coordinator failover logic. Right now, it's unclear how this rb's logic can be tested. Onur Karaman wrote: I added a ticket for 2: [KAFKA-2208](https://issues.apache.org/jira/browse/KAFKA-2208) Thanks for the prompt response Onur! 1. I think I agree with about the priority, and also I agree that the getting group logic should not be in join but rather heartbeat. 2. About consumer side failure detection, actually the consumer would not use heartbeat expiration to detect coordinator failure, but would only mark the current coordinator as dead upon disconnection / error-code reception. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/#review84539 --- On May 20, 2015, 4:13 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang
Build failed in Jenkins: KafkaPreCommit #105
See https://builds.apache.org/job/KafkaPreCommit/105/changes Changes: [wangguoz] KAFKA-1374; Log cleaner should be able to handle compressed messages; reviewed by Guozhang Wang -- [...truncated 733 lines...] kafka.zk.ZKEphemeralTest testEphemeralNodeCleanup PASSED kafka.server.ReplicaManagerTest testHighWaterMarkDirectoryMapping PASSED kafka.server.ReplicaManagerTest testHighwaterMarkRelativeDirectoryMapping PASSED kafka.server.ReplicaManagerTest testIllegalRequiredAcks PASSED kafka.server.IsrExpirationTest testIsrExpirationForStuckFollowers PASSED kafka.server.IsrExpirationTest testIsrExpirationIfNoFetchRequestMade PASSED kafka.server.IsrExpirationTest testIsrExpirationForSlowFollowers PASSED kafka.server.SimpleFetchTest testReadFromLog PASSED kafka.server.ServerGenerateBrokerIdTest testAutoGenerateBrokerId PASSED kafka.server.ServerGenerateBrokerIdTest testUserConfigAndGeneratedBrokerId PASSED kafka.server.ServerGenerateBrokerIdTest testMultipleLogDirsMetaProps PASSED kafka.server.ServerGenerateBrokerIdTest testConsistentBrokerIdFromUserConfigAndMetaProps PASSED kafka.server.ServerShutdownTest testCleanShutdown PASSED kafka.server.ServerShutdownTest testCleanShutdownWithDeleteTopicEnabled PASSED kafka.server.ServerShutdownTest testCleanShutdownAfterFailedStartup PASSED kafka.server.ServerShutdownTest testConsecutiveShutdown PASSED kafka.server.KafkaConfigTest testLogRetentionTimeHoursProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeMinutesProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeMsProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeNoConfigProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeBothMinutesAndHoursProvided PASSED kafka.server.KafkaConfigTest testLogRetentionTimeBothMinutesAndMsProvided PASSED kafka.server.KafkaConfigTest testLogRetentionUnlimited PASSED kafka.server.KafkaConfigTest testLogRetentionValid PASSED kafka.server.KafkaConfigTest testAdvertiseDefaults PASSED kafka.server.KafkaConfigTest testAdvertiseConfigured PASSED kafka.server.KafkaConfigTest testDuplicateListeners PASSED kafka.server.KafkaConfigTest testBadListenerProtocol PASSED kafka.server.KafkaConfigTest testListenerDefaults PASSED kafka.server.KafkaConfigTest testVersionConfiguration PASSED kafka.server.KafkaConfigTest testUncleanLeaderElectionDefault PASSED kafka.server.KafkaConfigTest testUncleanElectionDisabled PASSED kafka.server.KafkaConfigTest testUncleanElectionEnabled PASSED kafka.server.KafkaConfigTest testUncleanElectionInvalid PASSED kafka.server.KafkaConfigTest testLogRollTimeMsProvided PASSED kafka.server.KafkaConfigTest testLogRollTimeBothMsAndHoursProvided PASSED kafka.server.KafkaConfigTest testLogRollTimeNoConfigProvided PASSED kafka.server.KafkaConfigTest testDefaultCompressionType PASSED kafka.server.KafkaConfigTest testValidCompressionType PASSED kafka.server.KafkaConfigTest testInvalidCompressionType PASSED kafka.server.OffsetCommitTest testUpdateOffsets PASSED kafka.server.OffsetCommitTest testCommitAndFetchOffsets PASSED kafka.server.OffsetCommitTest testLargeMetadataPayload PASSED kafka.server.OffsetCommitTest testOffsetExpiration FAILED kafka.common.TopicExistsException: Topic topic already exists. at kafka.admin.AdminUtils$.createOrUpdateTopicPartitionAssignmentPathInZK(AdminUtils.scala:248) at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:233) at kafka.utils.TestUtils$.createTopic(TestUtils.scala:176) at kafka.server.OffsetCommitTest.testOffsetExpiration(OffsetCommitTest.scala:226) kafka.server.OffsetCommitTest testNonExistingTopicOffsetCommit PASSED kafka.server.LogOffsetTest testGetOffsetsForUnknownTopic PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeLatestTime PASSED kafka.server.LogOffsetTest testEmptyLogsGetOffsets PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeNow PASSED kafka.server.LogOffsetTest testGetOffsetsBeforeEarliestTime PASSED kafka.server.AdvertiseBrokerTest testBrokerAdvertiseToZK PASSED kafka.server.ServerStartupTest testBrokerCreatesZKChroot PASSED kafka.server.ServerStartupTest testConflictBrokerRegistration PASSED kafka.server.DelayedOperationTest testRequestSatisfaction PASSED kafka.server.DelayedOperationTest testRequestExpiry PASSED kafka.server.DelayedOperationTest testRequestPurge PASSED kafka.server.LeaderElectionTest testLeaderElectionAndEpoch PASSED kafka.server.LeaderElectionTest testLeaderElectionWithStaleControllerEpoch PASSED kafka.server.DynamicConfigChangeTest testConfigChange PASSED kafka.server.DynamicConfigChangeTest testConfigChangeOnNonExistingTopic PASSED kafka.server.HighwatermarkPersistenceTest testHighWatermarkPersistenceSinglePartition PASSED kafka.server.HighwatermarkPersistenceTest testHighWatermarkPersistenceMultiplePartitions PASSED
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
I think in general it is fine (even good) if a VOTE thread has a lot of discussion. The only issue I can think of is the one that Gwen made reference to: early votes - update KIP/whatever is being voted on due to more discussion - later votes as it then becomes unclear on what exactly each vote corresponds to. So basically if there is a non-trivial change to the material under vote due to discussion the vote should be canceled and a fresh vote thread should be started. Thanks, Joel On Wed, May 20, 2015 at 07:14:02PM +, Jiangjie Qin wrote: I actually feel many [VOTE] threads eventually become [DISCUSS] as people just put tons of comments there :) On 5/20/15, 11:52 AM, Jay Kreps jay.kr...@gmail.com wrote: Makes sense. Honghai, want to do a [VOTE] thread just so everything is official? -Jay On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com wrote: For simple discussions, I completely agree. For those threads where there are few votes, and then more discussion, and then KIP changes few times... separate thread will help keep things clear for both voters and anyone who will try to figure out what happened in the future. On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT
Re: Review Request 34418: Patch for KAFKA-2191
On May 19, 2015, 5:41 p.m., Dong Lin wrote: clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java, line 64 https://reviews.apache.org/r/34418/diff/1/?file=963982#file963982line64 Hey Jay, My approach differs with your only in this line: long ellapsedPriorWindowsMs = config.timeWindowMs() * (max(stat.samples.size() -1, 1)); I guess my approach is probably a bit more fine grained (and closer to existing code) than yours in the beginning config.timeWindowMs()*(config.samples()-1) ms. After that the behavior is exactly the same. I think this difference doesn't matter in production environment. It allows minutes-long system test a bit more accurate though. I am good with both approaches. Hey Jay, during my quota system test, I find that it is actually preferable to use to full window size to calculate the rate at the beginning, so that when client just starts and requests to fetch a large chunk of data, this initial large fetch request wouldn't be delayed for a long time. Therefore your approach is better. - Dong --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34418/#review84349 --- On May 19, 2015, 5:12 p.m., Jay Kreps wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34418/ --- (Updated May 19, 2015, 5:12 p.m.) Review request for kafka. Bugs: KAFKA-2191 https://issues.apache.org/jira/browse/KAFKA-2191 Repository: kafka Description --- Fix 0 ellapsed time rate bug. Diffs - clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java 98429da34418f7f1deba1b5e44e2e6025212edb3 clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 544e120594de78c43581a980b1e4087b4fb98ccb Diff: https://reviews.apache.org/r/34418/diff/ Testing --- Thanks, Jay Kreps
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review84536 --- Thanks for the latest patch. Just two new minor comments and a couple of previous comments that haven't been addressed. clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java https://reviews.apache.org/r/33065/#comment135849 We probably should make this (and the one in ProducerConfig) a bit smaller than the broker side default so that the client and server don't try to close the same idle socket at the same time. Perhaps sth like 9 mins? clients/src/main/java/org/apache/kafka/common/network/Send.java https://reviews.apache.org/r/33065/#comment135852 The issue that I am seeing is the following. When sending data to a socket, we use NetworkSend, which prepends a size to the payload buffer. So, in this case, size() for NetworkSend is actually 4 + the payload size. core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment135850 I was thinking of exposing IO-WAIT-RATIO as a gauge to NetworkProcessorAvgIdlePercent. This way, people who rely on Coda Hale metrics reporter can still get this important metric as before. In any case, IO-WAIT-RATIO is per processor and we want another metric at the aggregate level. core/src/main/scala/kafka/network/SocketServer.scala https://reviews.apache.org/r/33065/#comment135876 We probably can use the e @ pattern matching to handle both types of exceptions. - Jun Rao On May 20, 2015, 10:42 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 20, 2015, 10:42 a.m.) Review request for kafka. Bugs: 1928 and KAFKA-1928 https://issues.apache.org/jira/browse/1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Following Jun's comments - moved MultiSend to client. Cleaned up destinations as well removed reify and remaining from send/recieve API, per Jun. moved maybeCloseOldest() to Selector per Jay added idString to node API, changed written to int in Send API cleaning up MultiSend, added size() to Send interface fixed some issues with multisend Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 fixed metric thingies fixed response order bug error handling for illegal selector state and fix metrics bug optimized selection key lookup with identity hash fix accidental change Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 addressing Jun's comments removed connection-aging for clients fix issues with exception handling and other cleanup Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Revert removed connection-aging for clients This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1. improving exception handling and other minor fixes Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60
[jira] [Commented] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics
[ https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14552840#comment-14552840 ] Joel Koshy commented on KAFKA-1374: --- I just wanted the commit to record Manikumar as author. Unfortunately our git repo does not seem to let me force push an amended commit so nm. LogCleaner (compaction) does not support compressed topics -- Key: KAFKA-1374 URL: https://issues.apache.org/jira/browse/KAFKA-1374 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1374.patch, KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, KAFKA-1374_2015-01-18_00:19:21.patch, KAFKA-1374_2015-05-18_22:55:48.patch, KAFKA-1374_2015-05-19_17:20:44.patch This is a known issue, but opening a ticket to track. If you try to compact a topic that has compressed messages you will run into various exceptions - typically because during iteration we advance the position based on the decompressed size of the message. I have a bunch of stack traces, but it should be straightforward to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Makes sense. Honghai, want to do a [VOTE] thread just so everything is official? -Jay On Wed, May 20, 2015 at 11:22 AM, Gwen Shapira gshap...@cloudera.com wrote: For simple discussions, I completely agree. For those threads where there are few votes, and then more discussion, and then KIP changes few times... separate thread will help keep things clear for both voters and anyone who will try to figure out what happened in the future. On Wed, May 20, 2015 at 9:02 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset }
[jira] [Created] (KAFKA-2212) KafkaAuthorizer: Add CLI for Acl management.
Parth Brahmbhatt created KAFKA-2212: --- Summary: KafkaAuthorizer: Add CLI for Acl management. Key: KAFKA-2212 URL: https://issues.apache.org/jira/browse/KAFKA-2212 Project: Kafka Issue Type: Sub-task Reporter: Parth Brahmbhatt This is subtask-3 for Kafka-1688. Please see KIP-11 for details on CLI for Authorizer. https://cwiki.apache.org/confluence/display/KAFKA/KIP-11+-+Authorization+Interface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system
Hey all, How do people feel about these [DISCUSS] threads that basically accidentally turn into votes. Like basically everyone was +1 on this KIP already should we just skip the second vote? I find it kind of annoying to do both when this happens. -Jay On Mon, May 11, 2015 at 8:16 PM, Honghai Chen honghai.c...@microsoft.com wrote: All issues fixed, test cases added, performance result on windows attached. The patch can help improve the consume performance around 25%~50%. Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Wednesday, May 6, 2015 5:39 AM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system Thanks. Could you updated the wiki? Also, commented on the jira. Jun On Tue, May 5, 2015 at 12:48 AM, Honghai Chen honghai.c...@microsoft.com wrote: Use config.segmentSize should be ok. Previously add that one for make sure the file not exceed config.segmentSize, actually the function maybeRoll already make sure that. When try add test case for recover, blocked by the rename related issue, just open one jira at https://issues.apache.org/jira/browse/KAFKA-2170 , any recommendation for fix that issue? Thanks, Honghai Chen -Original Message- From: Jun Rao [mailto:j...@confluent.io] Sent: Tuesday, May 5, 2015 12:51 PM To: dev@kafka.apache.org Subject: Re: [DISCUSS] KIP 20 Enable log preallocate to improve consume performance under windows and some old Linux file system This seems similar to what's in https://issues.apache.org/jira/browse/KAFKA-1065. Also, could you explain why the preallocated size is set to config.segmentSize - 2 * config.maxMessageSize, instead of just config.segmentSize? Thanks, Jun On Mon, May 4, 2015 at 8:12 PM, Honghai Chen honghai.c...@microsoft.com wrote: Hi guys, I'm trying add test cases, but below case crashed at line segReopen.recover(64*1024)-- index.trimToValidSize() , any idea for it? Appreciate your help. The case assume kafka suddenly crash, and need recover the last segment. kafka.log.LogSegmentTest testCreateWithInitFileSizeCrash FAILED java.io.IOException: The requested operation cannot be performed on a file w ith a user-mapped section open at java.io.RandomAccessFile.setLength(Native Method) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:292) at kafka.log.OffsetIndex$$anonfun$resize$1.apply(OffsetIndex.scala:283) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.resize(OffsetIndex.scala:283) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply$mcV$sp(Offset I ndex.scala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.log.OffsetIndex$$anonfun$trimToValidSize$1.apply(OffsetIndex.s c ala:272) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:288) at kafka.log.OffsetIndex.trimToValidSize(OffsetIndex.scala:271) at kafka.log.LogSegment.recover(LogSegment.scala:199) at kafka.log.LogSegmentTest.testCreateWithInitFileSizeCrash(LogSegmentT e st.scala:306) def recover(maxMessageSize: Int): Int = { index.truncate() index.resize(index.maxIndexSize) var validBytes = 0 var lastIndexEntry = 0 val iter = log.iterator(maxMessageSize) try { while(iter.hasNext) { val entry = iter.next entry.message.ensureValid() if(validBytes - lastIndexEntry indexIntervalBytes) { // we need to decompress the message, if required, to get the offset of the first uncompressed message val startOffset = entry.message.compressionCodec match { case NoCompressionCodec = entry.offset case _ = ByteBufferMessageSet.deepIterator(entry.message).next().offset } index.append(startOffset, validBytes) lastIndexEntry = validBytes } validBytes += MessageSet.entrySize(entry.message) } } catch { case e: InvalidMessageException = logger.warn(Found invalid messages in log segment %s at byte offset %d: %s..format(log.file.getAbsolutePath, validBytes, e.getMessage)) } val truncated = log.sizeInBytes - validBytes log.truncateTo(validBytes) index.trimToValidSize() truncated } /* create a segment with pre allocate and Crash*/ @Test def testCreateWithInitFileSizeCrash() { val tempDir = TestUtils.tempDir() val seg = new LogSegment(tempDir, 40, 1, 1000, 0, SystemTime, false, 512*1024*1024, true)
[jira] [Resolved] (KAFKA-1374) LogCleaner (compaction) does not support compressed topics
[ https://issues.apache.org/jira/browse/KAFKA-1374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Joel Koshy resolved KAFKA-1374. --- Resolution: Fixed Please file a separate jira for that. Also, I'm going to amend this commit with the right author information. LogCleaner (compaction) does not support compressed topics -- Key: KAFKA-1374 URL: https://issues.apache.org/jira/browse/KAFKA-1374 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Labels: newbie++ Fix For: 0.8.3 Attachments: KAFKA-1374.patch, KAFKA-1374.patch, KAFKA-1374_2014-08-09_16:18:55.patch, KAFKA-1374_2014-08-12_22:23:06.patch, KAFKA-1374_2014-09-23_21:47:12.patch, KAFKA-1374_2014-10-03_18:49:16.patch, KAFKA-1374_2014-10-03_19:17:17.patch, KAFKA-1374_2015-01-18_00:19:21.patch, KAFKA-1374_2015-05-18_22:55:48.patch, KAFKA-1374_2015-05-19_17:20:44.patch This is a known issue, but opening a ticket to track. If you try to compact a topic that has compressed messages you will run into various exceptions - typically because during iteration we advance the position based on the decompressed size of the message. I have a bunch of stack traces, but it should be straightforward to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-269) ./system_test/producer_perf/bin/run-test.sh without --async flag does not run
[ https://issues.apache.org/jira/browse/KAFKA-269?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jay Kreps updated KAFKA-269: Reviewer: (was: Jay Kreps) ./system_test/producer_perf/bin/run-test.sh without --async flag does not run -- Key: KAFKA-269 URL: https://issues.apache.org/jira/browse/KAFKA-269 Project: Kafka Issue Type: Bug Components: clients, core Affects Versions: 0.7, 0.8.1.1 Environment: Linux 2.6.18-238.1.1.el5 , x86_64 x86_64 x86_64 GNU/Linux ext3 file system with raid10 Reporter: Praveen Ramachandra Labels: newbie, performance When I run the tests without --async option, The tests doesn't produce even a single message. Following defaults where changed in the server.properties num.threads=Tried with 8, 10, 100 num.partitions=10 -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553237#comment-14553237 ] Ismael Juma commented on KAFKA-2187: Updated reviewboard https://reviews.apache.org/r/34502/diff/ against branch origin/trunk Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Request for permission to edit pages in the wiki
Hi, In order to edit the Patch submission and review page with information on how to merge GitHub pull requests, it would be helpful to have edit permission for Kafka's Confluence pages. My Confluence account id is `ijuma`[1]. Thanks, Ismael [1] https://cwiki.apache.org/confluence/display/~ijuma
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553332#comment-14553332 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- For option 1 it's probably worth pointing out that we already have some finer grained synchronization (metrics and metadata since those are shared by many other components, and the producer doesn't have synchronization at the level of KafkaProducer, only on its internals). So we're already double locking in a lot of cases. My concern with option 2 is that it's a pretty unusual approach which makes the code harder to understand. Scanning through the code, there aren't that many places in KafkaConsumer where multiple components are used together in a way that would require synchronization. updateFetchPositions and refreshCommitttedOffsets might since they use subscriptions + fetcher and subscriptions + coordinator together, respectively. Especially with SubscriptionState we'd need to be careful since some of the calls to that return an internal collection flags, and the subsequent operation might need all that processing to be synchronized to be sure not to miss anything. For example, during partition reassignment, which checks a flag, does reassignment, and then resets the flag; we'd need to make sure that a subscription during that time wouldn't get lost. The other case is poll(). I thought this might be hard to reason about if some state was changing while it was executing, but I think it's not a problem as long as a few of the steps can be synchronized, in particular partition reassignment and offset commit. By the way, I mapped out the dependencies. It's sort of in 4 layers with subscriptions + metadata at the bottom, NetworkClient above that using only metadata, then all three are used by both coordinator and fetcher in the next layer, and then the top layer is KafkaConsumer. But KafkaConsumer touches all of them, so kind of breaks any layering. Some of the things in KafkaConsumer that require synchronization still could possibly move into a component in a lower level (possibly something new) if we move some of the code around. New consumer poll() can block other calls like position(), commit(), and close() indefinitely - Key: KAFKA-2168 URL: https://issues.apache.org/jira/browse/KAFKA-2168 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Jason Gustafson The new consumer is currently using very coarse-grained synchronization. For most methods this isn't a problem since they finish quickly once the lock is acquired, but poll() might run for a long time (and commonly will since polling with long timeouts is a normal use case). This means any operations invoked from another thread may block until the poll() call completes. Some example use cases where this can be a problem: * A shutdown hook is registered to trigger shutdown and invokes close(). It gets invoked from another thread and blocks indefinitely. * User wants to manage offset commit themselves in a background thread. If the commit policy is not purely time based, it's not currently possibly to make sure the call to commit() will be processed promptly. Two possible solutions to this: 1. Make sure a lock is not held during the actual select call. Since we have multiple layers (KafkaConsumer - NetworkClient - Selector - nio Selector) this is probably hard to make work cleanly since locking is currently only performed at the KafkaConsumer level and we'd want it unlocked around a single line of code in Selector. 2. Wake up the selector before synchronizing for certain operations. This would require some additional coordination to make sure the caller of wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() thread being woken up and then promptly reacquiring the lock with a subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: [VOTE] KIP-21 Dynamic Configuration
+1 On Wed, May 20, 2015 at 05:33:31AM +, Aditya Auradkar wrote: Thanks Andrii. I'll make the changes. I've also updated KIP-21 to include the new config requests. Take a look and vote. https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration Aditya From: Andrii Biletskyi [andrii.bilets...@stealth.ly] Sent: Tuesday, May 19, 2015 2:26 PM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-21 Dynamic Configuration Hi, Sorry I wasn't able to participate. I don't have objections about removing config changes from AlterTopic (as I understand both AddedConfig and DeletedConfig) - you are welcome to update the KIP page. Thanks, Andrii Biletskyi On Tue, May 19, 2015 at 11:40 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Updating the discussion with the latest comments. 1. We discussed adding 2 new API's (AlterConfig and DescribeConfig). I'll update KIP-21 with details on these. 2. Discussed during the KIP hangout. We are in agreement. (1) has a dependency on KIP-4 being completed. Rest of the work in the KIP can be implemented independently. Any concerns if we tackle it as two separate work items implementation wise? We also discussed changing the AlterTopic command in KIP-4 to not include config changes. Instead, all config changes will pass through the newly proposed AlterConfig. If no-one objects, I can make some changes to KIP-4 to reflect this. Aditya From: Jay Kreps [jay.kr...@gmail.com] Sent: Tuesday, May 19, 2015 10:51 AM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-21 Dynamic Configuration Hey Aditya, Two comments: 1. Yeah we need to reconcile this with the APIs in KIP-4. I think it does make sense to allow setting config during topic creation. I agree with your summary that having alter topic and alter config may be confusing, but there are also some non-config changes such as replication factor and partition count that alter topic can carry out. What is the final state you are proposing? 2. This is implementation related so probably can be removed from the KIP entirely, but you seem to be proposing a separate config manager for each config override type. Should we just generalize TopicConfigManager to be ConfigOverrideManager and have it handle all the override types we will have? I think I may just be unclear on what you are proposing... -Jay On Mon, May 18, 2015 at 1:34 PM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: Yeah, that was just a typo. I've fixed it. Thanks for calling it out. In KIP-4, I believe we have 3 types of requests: CreateTopic, AlterTopic and DeleteTopic. The topic configs are a sub-type of the Create and Alter commands. I think it would be nice to simply have a AlterConfig command that can alter any type of config rather than having a specific ClientConfig. AlterConfig = [ConfigType [AddedConfigEntry] [DeletedConfig]] ConfigType = string AddedConfigEntry = ConfigKey ConfigValue ConfigKey = string ConfigValue = string DeletedConfig = string The downside of this approach is that we will have 2 separate ways of changing topic configs (AlterTopic and AlterConfig). While a general AlterConfig only makes sense if we plan to have more than two types of entity configs.. it's definitely more future proof. Thoughts? Aditya From: Todd Palino [tpal...@gmail.com] Sent: Monday, May 18, 2015 12:39 PM To: dev@kafka.apache.org Subject: Re: [VOTE] KIP-21 Dynamic Configuration Agree with Jun here on the JSON format. I think your intention was likely to have actual JSON here and it was just a typo in the wiki? -Todd On Mon, May 18, 2015 at 12:07 PM, Jun Rao j...@confluent.io wrote: Aditya, Another thing to consider. In KIP-4, we are adding a new RPC request to change and retrieve topic configs. Do we want to add a similar RPC request to change configs per client id? If so, do we want to introduce a separate new request or have a combined new request for both topic and client id level config changes? A minor point in the wiki, for the json format in ZK, we should change {X1=Y1, X2=Y2..} to a json map, right? Thanks, Jun On Mon, May 18, 2015 at 9:48 AM, Aditya Auradkar aaurad...@linkedin.com.invalid wrote: https://cwiki.apache.org/confluence/display/KAFKA/KIP-21+-+Dynamic+Configuration Aditya
[jira] [Commented] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553394#comment-14553394 ] Craig Allsop commented on KAFKA-2147: - re: version, I understand. re: interval, I don't believe it will make any difference (as OP mentioned too) because, as you probably suspected, if pollExpired never returns it never checks the size of the queue. We have seen the queue not purged for as long as 2 hours and grow to millions. I'll attach a picture of this. Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Jun Rao Attachments: KAFKA-2147.patch, KAFKA-2147_2015-05-14_09:41:56.patch, KAFKA-2147_2015-05-15_16:14:44.patch, purgatory.log, purgatory.log.gz, watch-lists.log Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in
[jira] [Commented] (KAFKA-1778) Create new re-elect controller admin function
[ https://issues.apache.org/jira/browse/KAFKA-1778?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553429#comment-14553429 ] Joel Koshy commented on KAFKA-1778: --- I may be missing some detail, but (a), (b), (c) don't quite fit the scenario I was asking about: Even if all brokers know that a specific broker is supposed to become the preferred controller. What happens if that preferred controller is about to become the controller but crashes before it can update the /controller path in ZooKeeper. No further zookeeper watches will be triggered. Also, can we avoid the message from current controller to the preferred controller by having all brokers just watch the admin/next_controller znode? Under changes in election code - (a) did you mean that brokers should watch admin/next_controller znode (and not ready-to-serve-as-controller znode)? Create new re-elect controller admin function - Key: KAFKA-1778 URL: https://issues.apache.org/jira/browse/KAFKA-1778 Project: Kafka Issue Type: Sub-task Reporter: Joe Stein Assignee: Abhishek Nigam Fix For: 0.8.3 kafka --controller --elect -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Resolved] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma resolved KAFKA-2187. Resolution: Fixed Fix Version/s: 0.8.3 Issue resolved by pull request 3 [https://github.com/ijuma/kafka/pull/3] Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1335) Add rebalancing logic to the coordinator / consumer
[ https://issues.apache.org/jira/browse/KAFKA-1335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553322#comment-14553322 ] Guozhang Wang commented on KAFKA-1335: -- Hi [~hachikuji], I have already started working on this ticket, but if you are interested in working on the new consumer in general another interesting task would be adding a list-topic API into the new consumer. I think [~onurkaraman] may have some context in this. Add rebalancing logic to the coordinator / consumer --- Key: KAFKA-1335 URL: https://issues.apache.org/jira/browse/KAFKA-1335 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Guozhang Wang This implements the group management protocol. This will be a tricky and potentially large change since it will involve implementing the group management protocol, which include: 1) Adding the rebalance logic on the coordinator that can be triggered from membership change (either through failure detector or join group requests) and topic / partition ZK listener fires. 2) Adding the rebalance logic on the consumer upon topic change / error code from coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2168) New consumer poll() can block other calls like position(), commit(), and close() indefinitely
[ https://issues.apache.org/jira/browse/KAFKA-2168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553343#comment-14553343 ] Ewen Cheslack-Postava commented on KAFKA-2168: -- Actually, now I realize another solution is to only remove synchronization from the one place it's a problem -- things that might call NetworkClient.poll() with long timeouts. Could we use synchronized(this) around everything *except* the NetworkClient.poll() calls, and then have anything using NetworkClient synchronize on it? This is finer grained locking still, but I think could end up having pretty minimal impact on the current code. The drawback is that since NetworkClient is used by all the classes, the requirement of locking gets spread across all of them. New consumer poll() can block other calls like position(), commit(), and close() indefinitely - Key: KAFKA-2168 URL: https://issues.apache.org/jira/browse/KAFKA-2168 Project: Kafka Issue Type: Bug Components: clients, consumer Reporter: Ewen Cheslack-Postava Assignee: Jason Gustafson The new consumer is currently using very coarse-grained synchronization. For most methods this isn't a problem since they finish quickly once the lock is acquired, but poll() might run for a long time (and commonly will since polling with long timeouts is a normal use case). This means any operations invoked from another thread may block until the poll() call completes. Some example use cases where this can be a problem: * A shutdown hook is registered to trigger shutdown and invokes close(). It gets invoked from another thread and blocks indefinitely. * User wants to manage offset commit themselves in a background thread. If the commit policy is not purely time based, it's not currently possibly to make sure the call to commit() will be processed promptly. Two possible solutions to this: 1. Make sure a lock is not held during the actual select call. Since we have multiple layers (KafkaConsumer - NetworkClient - Selector - nio Selector) this is probably hard to make work cleanly since locking is currently only performed at the KafkaConsumer level and we'd want it unlocked around a single line of code in Selector. 2. Wake up the selector before synchronizing for certain operations. This would require some additional coordination to make sure the caller of wakeup() is able to acquire the lock promptly (instead of, e.g., the poll() thread being woken up and then promptly reacquiring the lock with a subsequent long poll() call). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Review Request 34502: Patch for KAFKA-2187
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34502/ --- Review request for kafka. Bugs: KAFKA-2187 https://issues.apache.org/jira/browse/KAFKA-2187 Repository: kafka Description --- KAFKA-2187; Introduce merge-kafka-pr.py script Diffs - kafka-merge-pr.py PRE-CREATION Diff: https://reviews.apache.org/r/34502/diff/ Testing --- Thanks, Ismael Juma
[jira] [Updated] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2187: --- Attachment: KAFKA-2187.patch Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2187.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2187: --- Status: Patch Available (was: Reopened) Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2187.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553226#comment-14553226 ] Ismael Juma commented on KAFKA-2187: Created reviewboard https://reviews.apache.org/r/34502/diff/ against branch origin/trunk Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Fix For: 0.8.3 Attachments: KAFKA-2187.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Request for permission to edit pages in the wiki
Done On Wed, May 20, 2015 at 11:45:48PM +0100, Ismael Juma wrote: Hi, In order to edit the Patch submission and review page with information on how to merge GitHub pull requests, it would be helpful to have edit permission for Kafka's Confluence pages. My Confluence account id is `ijuma`[1]. Thanks, Ismael [1] https://cwiki.apache.org/confluence/display/~ijuma
[jira] [Commented] (KAFKA-1335) Add rebalancing logic to the coordinator / consumer
[ https://issues.apache.org/jira/browse/KAFKA-1335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553418#comment-14553418 ] Jason Gustafson commented on KAFKA-1335: Hey [~guozhang], sounds fine to me. Is there already a ticket for this? Add rebalancing logic to the coordinator / consumer --- Key: KAFKA-1335 URL: https://issues.apache.org/jira/browse/KAFKA-1335 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Guozhang Wang This implements the group management protocol. This will be a tricky and potentially large change since it will involve implementing the group management protocol, which include: 1) Adding the rebalance logic on the coordinator that can be triggered from membership change (either through failure detector or join group requests) and topic / partition ZK listener fires. 2) Adding the rebalance logic on the consumer upon topic change / error code from coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-1479) Logs filling up while Kafka ReplicaFetcherThread tries to retrieve partition info for deleted topics
[ https://issues.apache.org/jira/browse/KAFKA-1479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553460#comment-14553460 ] Greg Lloyd commented on KAFKA-1479: --- I am not able to locate the directories that Manasi referenced, anybody mind pointing out where they are located. I have kafka isntalled in /usr/local/kafka and zookeeper installed in /usr/local/zookeeper. Logs filling up while Kafka ReplicaFetcherThread tries to retrieve partition info for deleted topics Key: KAFKA-1479 URL: https://issues.apache.org/jira/browse/KAFKA-1479 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1 Environment: CentOS Reporter: Manasi Manasi Started noticing that logs are filling up fast with lines like this: {quote} [2014-06-01 15:18:08,218] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-27,26] failed due to Topic sams_2014-05-27 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,218] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-28,38] failed due to Topic sams_2014-05-28 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-30,20] failed due to Topic sams_2014-05-30 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-22,46] failed due to Topic sams_2014-05-22 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-27,8] failed due to Topic sams_2014-05-27 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) {quote} The above is from kafkaServer.out. Also seeing errors in server.log: {quote} [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-26,19] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-30,4] failed due to Topic sams_2014-05-30 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-24,34] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-26,41] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [2014-05-21,0] failed due to Topic 2014-05-21 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-28,42] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-22,21] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-20,26] failed due to Topic sams_2014-05-20 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) {quote} All these partitions belong to deleted topics. Nothing changed on our end when we started noticing these logs filling up. Any ideas what is going on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2187) Introduce merge-kafka-pr.py script
[ https://issues.apache.org/jira/browse/KAFKA-2187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-2187: --- Fix Version/s: (was: 0.8.3) Introduce merge-kafka-pr.py script -- Key: KAFKA-2187 URL: https://issues.apache.org/jira/browse/KAFKA-2187 Project: Kafka Issue Type: New Feature Reporter: Ismael Juma Assignee: Ismael Juma Priority: Minor Attachments: KAFKA-2187.patch, KAFKA-2187_2015-05-20_23:14:05.patch This script will be used to merge GitHub pull requests and it will pull from the Apache Git repo to the current branch, squash and merge the PR, push the commit to trunk, close the PR (via commit message) and close the relevant JIRA issue (via JIRA API). Spark has a script that does most (if not all) of this and that will be used as the starting point: https://github.com/apache/spark/blob/master/dev/merge_spark_pr.py -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Issue Comment Deleted] (KAFKA-1737) Document required ZkSerializer for ZkClient used with AdminUtils
[ https://issues.apache.org/jira/browse/KAFKA-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vivek Madani updated KAFKA-1737: Comment: was deleted (was: Updated the patch. Removed the imports and made ZkStringSerializer private) Document required ZkSerializer for ZkClient used with AdminUtils Key: KAFKA-1737 URL: https://issues.apache.org/jira/browse/KAFKA-1737 Project: Kafka Issue Type: Improvement Components: tools Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Assignee: Vivek Madani Priority: Minor Attachments: KAFKA-1737.patch, KAFKA-1737.patch {{ZkClient}} instances passed to {{AdminUtils}} calls must have {{kafka.utils.ZKStringSerializer}} set as {{ZkSerializer}}. Otherwise commands executed via {{AdminUtils}} may not be seen/recognizable to broker, producer or consumer. E.g. producer (with auto topic creation turned off) will not be able to send messages to a topic created via {{AdminUtils}}, it will throw {{UnknownTopicOrPartitionException}}. Please consider at least documenting this requirement in {{AdminUtils}} scaladoc. For more info see [related discussion on Kafka user mailing list|http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAUywg-oihNiXuQRYeS%3D8Z3ymsmEHo6ghLs%3Dru4nbm%2BdHVz6TA%40mail.gmail.com%3E]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-1737) Document required ZkSerializer for ZkClient used with AdminUtils
[ https://issues.apache.org/jira/browse/KAFKA-1737?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vivek Madani updated KAFKA-1737: Status: Patch Available (was: In Progress) Submitting the patch with a wrapper around ZkClient to set ZkStringSerializer internally. Also removed imports as suggested earlier and made ZkStringSerializer private Document required ZkSerializer for ZkClient used with AdminUtils Key: KAFKA-1737 URL: https://issues.apache.org/jira/browse/KAFKA-1737 Project: Kafka Issue Type: Improvement Components: tools Affects Versions: 0.8.1.1 Reporter: Stevo Slavic Assignee: Vivek Madani Priority: Minor Attachments: KAFKA-1737.patch, KAFKA-1737.patch {{ZkClient}} instances passed to {{AdminUtils}} calls must have {{kafka.utils.ZKStringSerializer}} set as {{ZkSerializer}}. Otherwise commands executed via {{AdminUtils}} may not be seen/recognizable to broker, producer or consumer. E.g. producer (with auto topic creation turned off) will not be able to send messages to a topic created via {{AdminUtils}}, it will throw {{UnknownTopicOrPartitionException}}. Please consider at least documenting this requirement in {{AdminUtils}} scaladoc. For more info see [related discussion on Kafka user mailing list|http://mail-archives.apache.org/mod_mbox/kafka-users/201410.mbox/%3CCAAUywg-oihNiXuQRYeS%3D8Z3ymsmEHo6ghLs%3Dru4nbm%2BdHVz6TA%40mail.gmail.com%3E]. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 34502: Patch for KAFKA-2187
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34502/ --- (Updated May 20, 2015, 10:14 p.m.) Review request for kafka. Bugs: KAFKA-2187 https://issues.apache.org/jira/browse/KAFKA-2187 Repository: kafka Description --- KAFKA-2187; Introduce merge-kafka-pr.py script Diffs (updated) - kafka-merge-pr.py PRE-CREATION Diff: https://reviews.apache.org/r/34502/diff/ Testing --- Thanks, Ismael Juma
[jira] [Updated] (KAFKA-2147) Unbalanced replication can cause extreme purgatory growth
[ https://issues.apache.org/jira/browse/KAFKA-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Craig Allsop updated KAFKA-2147: Attachment: craig-kafka-purgatory-queue-size-issue.png Unbalanced replication can cause extreme purgatory growth - Key: KAFKA-2147 URL: https://issues.apache.org/jira/browse/KAFKA-2147 Project: Kafka Issue Type: Bug Components: purgatory, replication Affects Versions: 0.8.2.1 Reporter: Evan Huus Assignee: Jun Rao Attachments: KAFKA-2147.patch, KAFKA-2147_2015-05-14_09:41:56.patch, KAFKA-2147_2015-05-15_16:14:44.patch, craig-kafka-purgatory-queue-size-issue.png, purgatory.log, purgatory.log.gz, watch-lists.log Apologies in advance, this is going to be a bit of complex description, mainly because we've seen this issue several different ways and we're still tying them together in terms of root cause and analysis. It is worth noting now that we have all our producers set up to send RequiredAcks==-1, and that this includes all our MirrorMakers. I understand the purgatory is being rewritten (again) for 0.8.3. Hopefully that will incidentally fix this issue, or at least render it moot. h4. Symptoms Fetch request purgatory on a broker or brokers grows rapidly and steadily at a rate of roughly 1-5K requests per second. Heap memory used also grows to keep pace. When 4-5 million requests have accumulated in purgatory, the purgatory is drained, causing a substantial latency spike. The node will tend to drop leadership, replicate, and recover. h5. Case 1 - MirrorMaker We first noticed this case when enabling mirrormaker. We had one primary cluster already, with many producers and consumers. We created a second, identical cluster and enabled replication from the original to the new cluster on some topics using mirrormaker. This caused all six nodes in the new cluster to exhibit the symptom in lockstep - their purgatories would all grow together, and get drained within about 20 seconds of each other. The cluster-wide latency spikes at this time caused several problems for us. Turning MM on and off turned the problem on and off very precisely. When we stopped MM, the purgatories would all drop to normal levels immediately, and would start climbing again when we restarted it. Note that this is the *fetch* purgatories on the brokers that MM was *producing* to, which indicates fairly strongly that this is a replication issue, not a MM issue. This particular cluster and MM setup was abandoned for other reasons before we could make much progress debugging. h5. Case 2 - Broker 6 The second time we saw this issue was on the newest broker (broker 6) in the original cluster. For a long time we were running with five nodes, and eventually added a sixth to handle the increased load. At first, we moved only a handful of higher-volume partitions to this broker. Later, we created a group of new topics (totalling around 100 partitions) for testing purposes that were spread automatically across all six nodes. These topics saw occasional traffic, but were generally unused. At this point broker 6 had leadership for about an equal number of high-volume and unused partitions, about 15-20 of each. Around this time (we don't have detailed enough data to prove real correlation unfortunately), the issue started appearing on this broker as well, but not on any of the other brokers in the cluster. h4. Debugging The first thing we tried was to reduce the `fetch.purgatory.purge.interval.requests` from the default of 1000 to a much lower value of 200. This had no noticeable effect at all. We then enabled debug logging on broker06 and started looking through that. I can attach complete log samples if necessary, but the thing that stood out for us was a substantial number of the following lines: {noformat} [2015-04-23 20:05:15,196] DEBUG [KafkaApi-6] Putting fetch request with correlation id 49939 from client ReplicaFetcherThread-0-6 into purgatory (kafka.server.KafkaApis) {noformat} The volume of these lines seemed to match (approximately) the fetch purgatory growth on that broker. At this point we developed a hypothesis (detailed below) which guided our subsequent debugging tests: - Setting a daemon up to produce regular random data to all of the topics led by kafka06 (specifically the ones which otherwise would receive no data) substantially alleviated the problem. - Doing an additional rebalance of the cluster in order to move a number of other topics with regular data to kafka06 appears to have solved the problem completely. h4. Hypothesis Current versions (0.8.2.1 and earlier) have issues with the replica fetcher not backing off correctly (KAFKA-1461, KAFKA-2082 and others).
Re: Review Request 34450: Fix KAFKA-2017; rebased
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/#review84604 --- Thanks for the patch. A few comments below. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135934 Another way to do this is to only load from ZK on the becoming leader event for an offsetTopic partition. Then, we don't have to read from ZK during join group, which will introduce unnecessary overhead when joining a new group. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135970 I was thinking whether it's worth including the leader epoch (of the corresponding offset topic partition) in the ZK value as we did for leaderAndIsr to prevent a zombie consumer coordinator from overwriting the value, during a soft failure. I am not sure if it's worth doing this immediately because 1. When this happens, consumers can still recover after the heartbeat fails. 2. It seems that doing this right is a bit more complicated. We need to keep the leader epoch in the ZK value. However, during a leader change, we probably need to update the values in ZK with the new leader epoch as well, in order to truely prevent the zombie coordinator from overwriting the value. So, I think for now, we can just use the simple approach in this patch. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135961 Should we encode topics as an array of String? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135956 updatePersistentPath already handles the case when the node doesn't exist. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135959 Not sure if we need to wrap unexpected exceptions. The callers already handles unexpected exceptions. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135963 Should we auto fix this or throw a KafkaException here? If we do auto fix this, we probably should log a warning since this is not expected. Ditto for the handling of partitionStrategy. core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala https://reviews.apache.org/r/34450/#comment135965 Could we just do groups(group.groupId)? core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/34450/#comment135967 This doesn't seem to be used. - Jun Rao On May 20, 2015, 4:13 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-1335) Add rebalancing logic to the coordinator / consumer
[ https://issues.apache.org/jira/browse/KAFKA-1335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553458#comment-14553458 ] Onur Karaman commented on KAFKA-1335: - I'll write up the ticket with the details tonight. Add rebalancing logic to the coordinator / consumer --- Key: KAFKA-1335 URL: https://issues.apache.org/jira/browse/KAFKA-1335 Project: Kafka Issue Type: Sub-task Components: consumer Affects Versions: 0.9.0 Reporter: Neha Narkhede Assignee: Guozhang Wang This implements the group management protocol. This will be a tricky and potentially large change since it will involve implementing the group management protocol, which include: 1) Adding the rebalance logic on the coordinator that can be triggered from membership change (either through failure detector or join group requests) and topic / partition ZK listener fires. 2) Adding the rebalance logic on the consumer upon topic change / error code from coordinator. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (KAFKA-1479) Logs filling up while Kafka ReplicaFetcherThread tries to retrieve partition info for deleted topics
[ https://issues.apache.org/jira/browse/KAFKA-1479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553460#comment-14553460 ] Greg Lloyd edited comment on KAFKA-1479 at 5/21/15 2:05 AM: I am not able to locate the directories that Manasi referenced, anybody mind pointing out where they are located. I have kafka installed in /usr/local/kafka and zookeeper installed in /usr/local/zookeeper. was (Author: glloyd1): I am not able to locate the directories that Manasi referenced, anybody mind pointing out where they are located. I have kafka isntalled in /usr/local/kafka and zookeeper installed in /usr/local/zookeeper. Logs filling up while Kafka ReplicaFetcherThread tries to retrieve partition info for deleted topics Key: KAFKA-1479 URL: https://issues.apache.org/jira/browse/KAFKA-1479 Project: Kafka Issue Type: Bug Components: log Affects Versions: 0.8.1 Environment: CentOS Reporter: Manasi Manasi Started noticing that logs are filling up fast with lines like this: {quote} [2014-06-01 15:18:08,218] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-27,26] failed due to Topic sams_2014-05-27 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,218] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-28,38] failed due to Topic sams_2014-05-28 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-30,20] failed due to Topic sams_2014-05-30 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-22,46] failed due to Topic sams_2014-05-22 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:18:08,219] WARN [KafkaApi-2] Fetch request with correlation id 10049 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-27,8] failed due to Topic sams_2014-05-27 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) {quote} The above is from kafkaServer.out. Also seeing errors in server.log: {quote} [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-26,19] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-30,4] failed due to Topic sams_2014-05-30 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-24,34] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-26,41] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [2014-05-21,0] failed due to Topic 2014-05-21 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-28,42] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] ERROR [ReplicaFetcherThread-0-0], Error for partition [sams_2014-05-22,21] to broker 0:class kafka.common.UnknownTopicOrPartitionException (kafka.server.ReplicaFetcherThread) [2014-06-01 15:23:52,788] WARN [KafkaApi-2] Fetch request with correlation id 10887 from client ReplicaFetcherThread-0-2 on partition [sams_2014-05-20,26] failed due to Topic sams_2014-05-20 either doesn't exist or is in the process of being deleted (kafka.server.KafkaApis) {quote} All these partitions belong to deleted topics. Nothing changed on our end when we started noticing these logs filling up. Any ideas what is going on? -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33065: Patch for KAFKA-1928
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/#review84649 --- core/src/main/scala/kafka/network/RequestChannel.scala https://reviews.apache.org/r/33065/#comment135995 Found this problem while working on KAFKA-2208: we need to add the if / else for old / new request formats here as well. core/src/main/scala/kafka/network/RequestChannel.scala https://reviews.apache.org/r/33065/#comment135997 This would result in the following log entry: TRACE Completed request:{api_key=12,api_version=0,correlation_id=40,client_id=consumer-1} : {group_id=my-test,group_generation_id=1,consumer_id=e6c857b3-e362-4392-a272-94aa3e42d1fb} ... which is not very intuitive. Could we add a toString function to the requests as we did in the old classes? - Guozhang Wang On May 20, 2015, 10:42 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33065/ --- (Updated May 20, 2015, 10:42 a.m.) Review request for kafka. Bugs: 1928 and KAFKA-1928 https://issues.apache.org/jira/browse/1928 https://issues.apache.org/jira/browse/KAFKA-1928 Repository: kafka Description --- first pass on replacing Send implement maxSize and improved docs Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Conflicts: core/src/main/scala/kafka/network/RequestChannel.scala moved selector out of abstract thread mid-way through putting selector in SocketServer Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Also, SocketServer is now using Selector. Stil a bit messy - but all tests pass. Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 renamed requestKey to connectionId to reflect new use and changed type from Any to String Following Jun's comments - moved MultiSend to client. Cleaned up destinations as well removed reify and remaining from send/recieve API, per Jun. moved maybeCloseOldest() to Selector per Jay added idString to node API, changed written to int in Send API cleaning up MultiSend, added size() to Send interface fixed some issues with multisend Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 fixed metric thingies fixed response order bug error handling for illegal selector state and fix metrics bug optimized selection key lookup with identity hash fix accidental change Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 addressing Jun's comments removed connection-aging for clients fix issues with exception handling and other cleanup Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Revert removed connection-aging for clients This reverts commit 016669123a370b561b5ac78f8f1cf7bdd958e7d1. improving exception handling and other minor fixes Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into KAFKA-1928-v2 Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java da76cc257b4cfe3c4bce7120a1f14c7f31ef8587 clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java 936487b16e7ac566f8bdcd39a7240ceb619fd30e clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 1311f85847b022efec8cb05c450bb18231db6979 clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 435fbb5116e80302eba11ed1d3069cb577dbdcbd clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java bdff518b732105823058e6182f445248b45dc388 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java d301be4709f7b112e1f3a39f3c04cfa65f00fa60 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 8e336a3aa96c73f52beaeb56b931baf4b026cf21 clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 187d0004c8c46b6664ddaffecc6166d4b47351e5 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 1e943d621732889a1c005b243920dc32cea7af66 clients/src/main/java/org/apache/kafka/common/Node.java
Review Request 34524: Fix KAFKA-2208
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description --- 1. Add error handling on consumer; 2. Add the max / min consumer session timeout to kafka server configs; 3. Fixed some consumer bouncing tests Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Commented] (KAFKA-2208) Consumer should detect coordinator failures
[ https://issues.apache.org/jira/browse/KAFKA-2208?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14553520#comment-14553520 ] Guozhang Wang commented on KAFKA-2208: -- Created reviewboard https://reviews.apache.org/r/34524/diff/ against branch origin/trunk Consumer should detect coordinator failures --- Key: KAFKA-2208 URL: https://issues.apache.org/jira/browse/KAFKA-2208 Project: Kafka Issue Type: Sub-task Components: consumer Reporter: Onur Karaman Assignee: Guozhang Wang Attachments: KAFKA-2208.patch Add coordinator discovery and failure detection to the consumer. -- This message was sent by Atlassian JIRA (v6.3.4#6332)