Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91808 --- Ship it! Ship It! - Guozhang Wang On July 15, 2015, 1:21 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 15, 2015, 1:21 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure KAFKA-2123; fix comment issues Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION core/src/test/scala/integration/kafka/api/ConsumerTest.scala 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 Diff: https://reviews.apache.org/r/36333/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91810 --- clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (lines 274 - 276) https://reviews.apache.org/r/36333/#comment145455 I think this is a bug. schedule takes a timestamp, not the delta. - Onur Karaman On July 15, 2015, 1:21 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 15, 2015, 1:21 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure KAFKA-2123; fix comment issues Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION
Re: Review Request 36333: Patch for KAFKA-2123
On July 15, 2015, 8:10 p.m., Onur Karaman wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, lines 280-282 https://reviews.apache.org/r/36333/diff/5/?file=1011945#file1011945line280 I think this is a bug. schedule takes a timestamp, not the delta. Good catch. I'll submit a follow-up patch. I wonder if it ought to take a delta since most cases are just passing something like time.milliseconds() + interval. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91810 --- On July 15, 2015, 1:21 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 15, 2015, 1:21 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure KAFKA-2123; fix comment issues Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3
Re: Review Request 36333: Patch for KAFKA-2123
On July 14, 2015, 6 p.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java, line 274 https://reviews.apache.org/r/36333/diff/2-3/?file=1009096#file1009096line274 Seems like we're not handling this anymore? sendListOffsetRequest will return this type of error if we don't have metadata, the leader is unknown, the topic partition doesn't exist, or the broker isn't the leader for the partition. I think this is easy to miss because there's a number of layers in this class that don't get tested directly, so there are a lot of scenarios that the unit test probably isn't covering now. Maybe fix up handling of the exception for now and file a jira to follow up with better unit tests, especially if we think this code is finally settling down? Haha, I don't think I'll be able to persuade Guozhang to commit another refactor, so I think this is it. I agree on the need for more unit tests for Fetcher. I'll create a new jira. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91646 --- On July 14, 2015, 8:21 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 14, 2015, 8:21 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91651 --- LGTM overall, just some minor comments below. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 815) https://reviews.apache.org/r/36333/#comment145173 Is this comment accurate? Could a sync commit throw exception when it is no longer retriable on returned Future? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 1071) https://reviews.apache.org/r/36333/#comment145207 nit: not capitalizing the first letter just for comment consistency? clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 175) https://reviews.apache.org/r/36333/#comment145218 I think for in-function comments we will not capitalize comments, same below. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 186) https://reviews.apache.org/r/36333/#comment145216 decapitalize ensure that? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 222) https://reviews.apache.org/r/36333/#comment145215 missing one @param. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 234) https://reviews.apache.org/r/36333/#comment145219 Same here and below regarding comments. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 615) https://reviews.apache.org/r/36333/#comment145226 nit: Move this class right after sendOffsetCommitRequest()? clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java (line 41) https://reviews.apache.org/r/36333/#comment145227 Comment: remove all instances of the task in the queue? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java (line 144) https://reviews.apache.org/r/36333/#comment145229 Decapitalize comments, same below. clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java (line 46) https://reviews.apache.org/r/36333/#comment145233 Maybe rename to testSend(...) and below just for naming consistency? clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java (line 286) https://reviews.apache.org/r/36333/#comment145234 Did you check the expected tag works here? I have seen in some old junit versions the test will still pass even if the exception is now thrown. clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java (line 62) https://reviews.apache.org/r/36333/#comment145235 Rename to testRemove? clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java (line 63) https://reviews.apache.org/r/36333/#comment145236 testReset? clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java (line 23) https://reviews.apache.org/r/36333/#comment145237 testCompose... ? Same below. l, - Guozhang Wang On July 14, 2015, 8:21 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 14, 2015, 8:21 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 14, 2015, 8:21 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description (updated) --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION core/src/test/scala/integration/kafka/api/ConsumerTest.scala 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 Diff: https://reviews.apache.org/r/36333/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 15, 2015, 1:21 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description (updated) --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes KAFKA-2123; refresh metadata on listOffset failure KAFKA-2123; fix comment issues Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION core/src/test/scala/integration/kafka/api/ConsumerTest.scala 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 Diff: https://reviews.apache.org/r/36333/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91646 --- clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java https://reviews.apache.org/r/36333/#comment145171 Seems like we're not handling this anymore? sendListOffsetRequest will return this type of error if we don't have metadata, the leader is unknown, the topic partition doesn't exist, or the broker isn't the leader for the partition. I think this is easy to miss because there's a number of layers in this class that don't get tested directly, so there are a lot of scenarios that the unit test probably isn't covering now. Maybe fix up handling of the exception for now and file a jira to follow up with better unit tests, especially if we think this code is finally settling down? One last issue I found, but otherwise LGTM now. - Ewen Cheslack-Postava On July 14, 2015, 1:45 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 14, 2015, 1:45 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91564 --- clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 961) https://reviews.apache.org/r/36333/#comment144989 Hmm, this seems like very different behavior from before. Won't this trigger an offset fetch request *every* time this method is called? Seems like that could be very bad behavior if I wanted to do something like list the committed offsets for the partitions this consumer owns (i.e. by iterating over the very confusingly named SetTopicPartitions subscriptions(), which returns assigned partitions). Wouldn't the previous logic where it checks if we have the committed offset first make sense? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 197) https://reviews.apache.org/r/36333/#comment144993 Won't this always sleep even if we succeeded? Unlike similar code earlier in this class, this one doesn't check if future.succeeded(). clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java (line 516) https://reviews.apache.org/r/36333/#comment144998 It looks like a bunch of reorganization + addition of createCoordinator() calls were added, but it looks like they all use a MockRebalanceCallback? Even the ones that explicitly create their own callback and pass it into createCoordinator()? Maybe I'm just missing the reason for these changes? - Ewen Cheslack-Postava On July 12, 2015, 12:34 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 12, 2015, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION
Re: Review Request 36333: Patch for KAFKA-2123
On July 14, 2015, 12:04 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, lines 117-131 https://reviews.apache.org/r/36333/diff/1-2/?file=1002924#file1002924line117 With this change, we are now always sending an OffsetFetchRequest even when subscriptions.refreshCommitsNeeded returns false? You are right. The weird thing about this API is that it accepts the partitions to refresh, but SubscriptionState only has a single flag indicating that a refresh is needed. This means that refreshing committed offsets for a subset of the partitions (which is what KafkaConsumer does) could cause us to fail to refresh the other partitions. I think maybe we should just remove the partitions parameter and always refresh all assigned partitions when a refresh is needed. Either that or we need to invalidate committed offsets on a per-partition basis. On July 14, 2015, 12:04 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, lines 558-559 https://reviews.apache.org/r/36333/diff/2/?file=1009093#file1009093line558 When the coordinator is dead / not known yet, the consumerCoordinator field could be null, but since we do not stop scheduling the heartbeat tasks, will this cause triggering client.send(null, ...)? Actually I changed the code so that heartbeats are only rescheduled when the coordinator is known (and we are not awaiting a group join). Take a look at HeartbeatTask and see if it addresses this concern. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91565 --- On July 12, 2015, 12:34 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 12, 2015, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91565 --- clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (lines 117 - 131) https://reviews.apache.org/r/36333/#comment144990 With this change, we are now always sending an OffsetFetchRequest even when subscriptions.refreshCommitsNeeded returns false? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (lines 506 - 507) https://reviews.apache.org/r/36333/#comment144992 When the coordinator is dead / not known yet, the consumerCoordinator field could be null, but since we do not stop scheduling the heartbeat tasks, will this cause triggering client.send(null, ...)? - Guozhang Wang On July 12, 2015, 12:34 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 12, 2015, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
Re: Review Request 36333: Patch for KAFKA-2123
On July 14, 2015, 12:32 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 968 https://reviews.apache.org/r/36333/diff/1-2/?file=1002921#file1002921line968 Hmm, this seems like very different behavior from before. Won't this trigger an offset fetch request *every* time this method is called? Seems like that could be very bad behavior if I wanted to do something like list the committed offsets for the partitions this consumer owns (i.e. by iterating over the very confusingly named SetTopicPartitions subscriptions(), which returns assigned partitions). Wouldn't the previous logic where it checks if we have the committed offset first make sense? Yes, it's missing a check for subscriptions.refreshCommitsNeeded. I had tried to fix a problem I noticed with using a subset of the assigned partitions when refreshing commits (see my comment in Guozhang's review above), but my fix inadvertently introduced this and it didn't really solve the problem either. I'll take another look. On July 14, 2015, 12:32 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 202 https://reviews.apache.org/r/36333/diff/1-2/?file=1002924#file1002924line202 Won't this always sleep even if we succeeded? Unlike similar code earlier in this class, this one doesn't check if future.succeeded(). Yep, good catch. - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91564 --- On July 12, 2015, 12:34 a.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 12, 2015, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 14, 2015, 1:45 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description (updated) --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments KAFKA-2123; more review fixes Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION core/src/test/scala/integration/kafka/api/ConsumerTest.scala 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 Diff: https://reviews.apache.org/r/36333/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 775 https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775 I think KAFKA-1894 is already fixed in this patch + KAFKA-2168? Jason Gustafson wrote: I think this is still debatable. Is wakeup() sufficient to assuage our guilt for allowing poll to block indefinitely in spite of the passed timeout? Perhaps I'm the only one, but I'm still holding out hope that we'll be able to enforce the timeout even if we are in the middle of a join group. The code is actually not that far from being able to do so. Guozhang Wang wrote: Yeah makes sense. Ewen Cheslack-Postava wrote: I'm still holding out hope as well :) There were one or two places in this patch where timeouts aren't passed into some methods where I think they could be passed in and respsected safely, but I didn't want to make this patch even bigger. Gwen Shapira wrote: Same here :) Is there a JIRA to track this plan? Jason Gustafson wrote: My plan was to resolve this as part of KAFKA-1894, but perhaps a different ticket is needed? KAFK-1894 seems sufficient, but as you're refactoring this anyway, might make sense to try to track what methods don't respect timeouts and could cause problems, especially all the way up at the KafkaConsumer level. We could figure this out again later, but better to just document what's required now, which also gives an idea of how much work there is left to do. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91012 --- On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 8, 2015, 9:19 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 12, 2015, 12:34 a.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description (updated) --- KAFKA-2123; resolve problems from rebase KAFKA-2123; address review comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION core/src/test/scala/integration/kafka/api/ConsumerTest.scala 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 Diff: https://reviews.apache.org/r/36333/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 775 https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775 I think KAFKA-1894 is already fixed in this patch + KAFKA-2168? Jason Gustafson wrote: I think this is still debatable. Is wakeup() sufficient to assuage our guilt for allowing poll to block indefinitely in spite of the passed timeout? Perhaps I'm the only one, but I'm still holding out hope that we'll be able to enforce the timeout even if we are in the middle of a join group. The code is actually not that far from being able to do so. Guozhang Wang wrote: Yeah makes sense. Ewen Cheslack-Postava wrote: I'm still holding out hope as well :) There were one or two places in this patch where timeouts aren't passed into some methods where I think they could be passed in and respsected safely, but I didn't want to make this patch even bigger. Same here :) Is there a JIRA to track this plan? - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91012 --- On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 8, 2015, 9:19 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 775 https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775 I think KAFKA-1894 is already fixed in this patch + KAFKA-2168? Jason Gustafson wrote: I think this is still debatable. Is wakeup() sufficient to assuage our guilt for allowing poll to block indefinitely in spite of the passed timeout? Perhaps I'm the only one, but I'm still holding out hope that we'll be able to enforce the timeout even if we are in the middle of a join group. The code is actually not that far from being able to do so. Guozhang Wang wrote: Yeah makes sense. I'm still holding out hope as well :) There were one or two places in this patch where timeouts aren't passed into some methods where I think they could be passed in and respsected safely, but I didn't want to make this patch even bigger. - Ewen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91012 --- On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 8, 2015, 9:19 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, lines 88-93 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line88 This is not introduced in the patch, but I am not sure if this is the right way to respect backoff time. For example, if the destination broker is down for a short period of time, poll(retryBackoffMs) will immediately return, and hence this function will busy triggering poll() and fluding the network with metadata requests right? What we want in this case, is that the consumer should wait for retryBackoffMs before retry sending the next metadata request. Jason Gustafson wrote: This code was lifted from KafkaConsumer, but I think you are right. I'll fix it for the next patch. Actually, I think this implementation may be reasonable. Retries and backoff are already handled in NetworkClient, so this wouldn't flood the network with additional requests. I think we could use poll(Long.MAX_VALUE) if we wanted though. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 775 https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775 I think KAFKA-1894 is already fixed in this patch + KAFKA-2168? Jason Gustafson wrote: I think this is still debatable. Is wakeup() sufficient to assuage our guilt for allowing poll to block indefinitely in spite of the passed timeout? Perhaps I'm the only one, but I'm still holding out hope that we'll be able to enforce the timeout even if we are in the middle of a join group. The code is actually not that far from being able to do so. Guozhang Wang wrote: Yeah makes sense. Ewen Cheslack-Postava wrote: I'm still holding out hope as well :) There were one or two places in this patch where timeouts aren't passed into some methods where I think they could be passed in and respsected safely, but I didn't want to make this patch even bigger. Gwen Shapira wrote: Same here :) Is there a JIRA to track this plan? My plan was to resolve this as part of KAFKA-1894, but perhaps a different ticket is needed? - Jason --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91012 --- On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 8, 2015, 9:19 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: Browsed through the patch, overall looks very promising. I am not very clear on a few detailed changes though: 1. The request future adapter / handler modifications. 2. Retry backoff implementation seems not correct. Could you explain a little bit on these two aspects? Thanks for the feedback. Comments below. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java, line 27 https://reviews.apache.org/r/36333/diff/1/?file=1002919#file1002919line27 You may want to add the committed offset map in the callback since otherwise it is unclear which commit it is referring to when triggered. This is a good idea. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, lines 88-93 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line88 This is not introduced in the patch, but I am not sure if this is the right way to respect backoff time. For example, if the destination broker is down for a short period of time, poll(retryBackoffMs) will immediately return, and hence this function will busy triggering poll() and fluding the network with metadata requests right? What we want in this case, is that the consumer should wait for retryBackoffMs before retry sending the next metadata request. This code was lifted from KafkaConsumer, but I think you are right. I'll fix it for the next patch. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 120 https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line120 We can remove this line since it is checked inside ensureAssignment() already. It's a little subtle, but the call to ensureCoordinatorKnown in ensureAssignment is inside the loop, so it will only be called when an assignment is needed. I considered putting it outside the loop, but inside is where it needs to be, and it looks odd to have it in both places. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 218 https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line218 There is a potential risk of not aligning the scheduling of heartbeat with the discovery of the coordinator. For example, let's say: 1. at t0 we call initHeartbeatTask with interval 100; 2. at t1 the consumer already find the coordinator, but it will not send the first HB until t100; 3. at t100 the consumer may find itself already been kicked out of the group by the coordinator, and reschedule at t200 and re-join group. 4. at t101 the consumer has re-joined the group, but will not send the HB until t200, and so on .. Perhaps the way to deal with this is to give the heartbeat task suspend() and resume() methods so that we can align it properly according to coordinator events. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 296 https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line296 Why we change the behavior to directly throw exception here? I removed the retry action from RequestFuture in favor of a check for RetriableException. In this case, both errors are retriable, so the result is the same. On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java, lines 1-43 https://reviews.apache.org/r/36333/diff/1/?file=1002931#file1002931line1 Not clear why you want to convert the future type in this adapter, can you elaborate a bit? The idea of the future adapter was to provide a way to convert from a future of one type to another. The use case was converting RequestFutureClientResponse to instances of the respective request (e.g. RequestFutureJoinGroupResponse). If you look at the error handling in the consumer, they are all doing basically the same thing. If the error is NOT_COORDINATOR_FOR_CONSUMER, for example, we just mark the coordinator dead. I hoped that this api would allow us to handle some coordinator errors generically while leaving the respective request handler to deal only with its specific errors. Unfortunately, the only generic check that this enabled was the check for a disconnect. To locate server error codes, you have to traverse the respective response structures, which cannot be done generically. At the moment, I'm not sure it brings enough to the table to justify its existence, but since it's a fairly common pattern with future handling and one that might be helpful in the future, I op ted to
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 121 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121 Is this the behavior we want? Both timeout and delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks especially, it seems like we're tying the failure of requests to unrelated events? Previously, I thought request failures may happen quickly, but then there was a Utils.sleep backoff. I am not seeing how that is handled now? If the connection isn't established, won't poll() not send out requests, run client.poll(), possibly return very quickly before the connection is established/fails, then fail the unsent requests, then just return to the caller? And that caller might be one of those while() loops, so it may just continue to retry, busy looping while not actually accomplishing anything. If we're going to introduce queuing of send requests here, it seems like a timeout (rather than fast fail + backoff) might be a more natural solution. So rather than clearUnsentRequests, it might be clearExpiredRequests. This is not clear from the comments, but the main reason for the unsent request queue in this class is to get around NetworkClient's single-send behavior. Currently, NetworkClient will only accept one send at a time for a given node. If you have multiple requests, you have to sprinkle in a bunch of poll() calls and use ready() to make sure that they can all get buffered. This is annoying because you cannot attach a callback to a request until you are actually able to send it. This means you always have to send requests in a poll loop, which implies you can't really have asynchronous requests. The unsent request queue provides a way to buffer these requests before transporting to the network layer which allows us to give a future to the caller immediately. Now, when the user calls poll, we try to send all of the unsent requests immediately (by doing the ready()/poll() dance until nothing else can be sent). Note that none of this depends on the timeout passed to poll: we are just calling poll(0) until nothing can be sent. After that, then we call poll(timeout). It may be the case that there were some requests that failed to be sent. This could be because the client is still connecting to the node or that its send buffer is full. To make the behavior of this class easy to understand, I just fail these requests which means that the unsent request queue never accumulates beyond a poll call. So every request sent either gets transported or fails. I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. It should probably be invoked whenever we retry a request to the coordinator. One potential issue is that the configuration retry.backoff.ms (as its currently documented) is only intended for fetch requests, but we have been using it for all requests. Perhaps there should be a separate configuration? For example coordinator.retry.backoff.ms? Or perhaps a hard-coded value is really what we want. On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 91 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91 Is the retryBackoffMs here because that's how long you want to wait for a response before we jump back out of the poll, which fails the request and triggers another NetworkClient.poll() call, which in turn sends the MetadataRequest? Just trying to figure out the flow because a) the flow is unclear and b) this is the only use of metadata in this class. Would this make sense to push into NetworkClient, which is the one responsible for making the request anyway? I'm not sure it's a good idea since nothing besides this class uses NetworkClient anymore, but worth thinking about. Guozhang mentioned this code as well, which was copied from KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably could be poll(-1) since we are not actually resending any requests. To me, the weird thing has always been that metadata updates are handled directly in NetworkClient and not in the higher layers. I can see the reason for it, but it makes it difficult to attach code to metadata updates (which is exactly what we need to implement regex subscriptions). I would personally be in favor of pulling it out of NetworkClient, but that has implications for KafkaProducer as well. Moving this method into NetworkClient is probably a better idea at the moment and may give us a fighting chance to get the logic right. On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, line 119
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 1:42 a.m., Guozhang Wang wrote: clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, line 775 https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775 I think KAFKA-1894 is already fixed in this patch + KAFKA-2168? Jason Gustafson wrote: I think this is still debatable. Is wakeup() sufficient to assuage our guilt for allowing poll to block indefinitely in spite of the passed timeout? Perhaps I'm the only one, but I'm still holding out hope that we'll be able to enforce the timeout even if we are in the middle of a join group. The code is actually not that far from being able to do so. Yeah makes sense. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91012 --- On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 8, 2015, 9:19 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
Re: Review Request 36333: Patch for KAFKA-2123
On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 91 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91 Is the retryBackoffMs here because that's how long you want to wait for a response before we jump back out of the poll, which fails the request and triggers another NetworkClient.poll() call, which in turn sends the MetadataRequest? Just trying to figure out the flow because a) the flow is unclear and b) this is the only use of metadata in this class. Would this make sense to push into NetworkClient, which is the one responsible for making the request anyway? I'm not sure it's a good idea since nothing besides this class uses NetworkClient anymore, but worth thinking about. Jason Gustafson wrote: Guozhang mentioned this code as well, which was copied from KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably could be poll(-1) since we are not actually resending any requests. To me, the weird thing has always been that metadata updates are handled directly in NetworkClient and not in the higher layers. I can see the reason for it, but it makes it difficult to attach code to metadata updates (which is exactly what we need to implement regex subscriptions). I would personally be in favor of pulling it out of NetworkClient, but that has implications for KafkaProducer as well. Moving this method into NetworkClient is probably a better idea at the moment and may give us a fighting chance to get the logic right. When implementing the new producer, people feel it is better letting NetworkClient handles metadata responses specifically aside from other responses so that the high-level module (Sender for producer, and whatever the module for consumer at that time) does not need to handle it. I think this motivation is still valid even as today, but we did not thought through the regex subscriptions at that time.. On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, line 121 https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121 Is this the behavior we want? Both timeout and delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks especially, it seems like we're tying the failure of requests to unrelated events? Previously, I thought request failures may happen quickly, but then there was a Utils.sleep backoff. I am not seeing how that is handled now? If the connection isn't established, won't poll() not send out requests, run client.poll(), possibly return very quickly before the connection is established/fails, then fail the unsent requests, then just return to the caller? And that caller might be one of those while() loops, so it may just continue to retry, busy looping while not actually accomplishing anything. If we're going to introduce queuing of send requests here, it seems like a timeout (rather than fast fail + backoff) might be a more natural solution. So rather than clearUnsentRequests, it might be clearExpiredRequests. Jason Gustafson wrote: This is not clear from the comments, but the main reason for the unsent request queue in this class is to get around NetworkClient's single-send behavior. Currently, NetworkClient will only accept one send at a time for a given node. If you have multiple requests, you have to sprinkle in a bunch of poll() calls and use ready() to make sure that they can all get buffered. This is annoying because you cannot attach a callback to a request until you are actually able to send it. This means you always have to send requests in a poll loop, which implies you can't really have asynchronous requests. The unsent request queue provides a way to buffer these requests before transporting to the network layer which allows us to give a future to the caller immediately. Now, when the user calls poll, we try to send all of the unsent requests immediately (by doing the ready()/poll() dance until nothing else can be sent). Note that none of this depends on the timeout passed to poll: we are just calling poll(0) until nothing can be sent. After that, then we call poll(timeout). It may be the case that there were some requests that failed to be sent. This could be because the client is still connecting to the node or that its send buffer is full. To make the behavior of this class easy to understand, I just fail these requests which means that the unsent request queue never accumulates beyond a poll call. So every request sent either gets transported or fails. I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. It should probably be invoked
Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java fd98740bff175cc9d5bc02e365d88e011ef65d22 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 7aa076084c894bb8f47b9df2c086475b06f47060 clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 46e26a665a22625d50888efa7b53472279f36e79 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1c8172cd45f6715262f9a6f497a7b1797a834a3 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 695eaf63db9a5fa20dc2ca68957901462a96cd96 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java 51eae1944d5c17cf838be57adf560bafe36fbfbd clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java 13fc9af7392b4ade958daf3b0c9a165ddda351a6 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java PRE-CREATION clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java 683745304c671952ff566f23b5dd4cf3ab75377a clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java PRE-CREATION clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 4c0ecc3badd99727b5bd9d430364e61c184e0923 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java d085fe5c9e2a0567893508a1c71f014fae6d7510 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java PRE-CREATION clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java 405efdc7a59438731cbc3630876bda0042a3adb3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java ee1ede01efa070409b86f5d8874cd578e058ce51 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java PRE-CREATION core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 476973b2c551db5be3f1c54f94990f0dd15ff65e core/src/test/scala/integration/kafka/api/ConsumerTest.scala 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 Diff: https://reviews.apache.org/r/36333/diff/ Testing --- Thanks, Jason Gustafson
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91016 --- clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 91) https://reviews.apache.org/r/36333/#comment144254 Is the retryBackoffMs here because that's how long you want to wait for a response before we jump back out of the poll, which fails the request and triggers another NetworkClient.poll() call, which in turn sends the MetadataRequest? Just trying to figure out the flow because a) the flow is unclear and b) this is the only use of metadata in this class. Would this make sense to push into NetworkClient, which is the one responsible for making the request anyway? I'm not sure it's a good idea since nothing besides this class uses NetworkClient anymore, but worth thinking about. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 121) https://reviews.apache.org/r/36333/#comment144258 Is this the behavior we want? Both timeout and delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks especially, it seems like we're tying the failure of requests to unrelated events? Previously, I thought request failures may happen quickly, but then there was a Utils.sleep backoff. I am not seeing how that is handled now? If the connection isn't established, won't poll() not send out requests, run client.poll(), possibly return very quickly before the connection is established/fails, then fail the unsent requests, then just return to the caller? And that caller might be one of those while() loops, so it may just continue to retry, busy looping while not actually accomplishing anything. If we're going to introduce queuing of send requests here, it seems like a timeout (rather than fast fail + backoff) might be a more natural solution. So rather than clearUnsentRequests, it might be clearExpiredRequests. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 113) https://reviews.apache.org/r/36333/#comment144273 I think this is a hold over from previous code, so feel free to defer to another JIRA. The code in KafkaConsumer.committed() implies it should work for partitions not assigned to this consumer (and that is useful functionality we should probably support). But here, the argument is ignored in the condition for the while() loop, which seems error prone. I don't see a way that comitted() would cause subscriptions.refreshCommitsNeeded to return true, so I think it'll currently fail for unassigned partitions. For this code, I think it's risky not to specifically be checking the requested partitions against the available set in subscriptions unless the caller is already guaranteed to do that. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 137) https://reviews.apache.org/r/36333/#comment144272 This being separated from the rebalance callbacks concerns me a little, specifically because we're adding user callbacks and those can potentially make additional calls on the consumer. I scanned through all the usages and can currently only find one weird edge case that could create incorrect behavior: 1. Do async offset commit 2. During normal poll(), offset commit finishes and callback is invoked. 3. Callback changes calls committed(TopicPartition), which invokes refreshCommittedOffsets, which invokes ensureAssignment and handles the rebalance without invoking callbacks. Actually, I think based on my earlier comment this won't currently happen, but only because I think committed() won't work properly with non-assigned partitions. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 298) https://reviews.apache.org/r/36333/#comment144274 Why trigger commit refresh here? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 526) https://reviews.apache.org/r/36333/#comment144281 Stray unnecessary Coordinator. prefix. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 640) https://reviews.apache.org/r/36333/#comment144280 Is this really the what should be happening? I'm trying to figure out how we can even reach this code since RequestFutureCompletionHandler only has onComplete - RequestFuture.complete(), which indicates success. This does need to handle both cases, but can we validly get to onFailure the way this class is used, or should this be some stronger assertion? clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java (line 74) https://reviews.apache.org/r/36333/#comment144226 Since we bumped to Java 1.7 you
Re: Review Request 36333: Patch for KAFKA-2123
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91012 --- Browsed through the patch, overall looks very promising. I am not very clear on a few detailed changes though: 1. The request future adapter / handler modifications. 2. Retry backoff implementation seems not correct. Could you explain a little bit on these two aspects? clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java (line 27) https://reviews.apache.org/r/36333/#comment144213 You may want to add the committed offset map in the callback since otherwise it is unclear which commit it is referring to when triggered. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (lines 542 - 545) https://reviews.apache.org/r/36333/#comment144216 Is there any particular reason you want to materialize the sessionTimeoutMs variable? It seems only referred once at line 545. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 765) https://reviews.apache.org/r/36333/#comment144219 I think KAFKA-1894 is already fixed in this patch + KAFKA-2168? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java (line 786) https://reviews.apache.org/r/36333/#comment144233 Add some comments: re-schedule the commit task for the next commit interval? clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (lines 88 - 93) https://reviews.apache.org/r/36333/#comment144252 This is not introduced in the patch, but I am not sure if this is the right way to respect backoff time. For example, if the destination broker is down for a short period of time, poll(retryBackoffMs) will immediately return, and hence this function will busy triggering poll() and fluding the network with metadata requests right? What we want in this case, is that the consumer should wait for retryBackoffMs before retry sending the next metadata request. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (line 119) https://reviews.apache.org/r/36333/#comment144250 This function can be private. clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java (lines 135 - 138) https://reviews.apache.org/r/36333/#comment144253 Same as above in awaitMetadataUpdate(). clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 114) https://reviews.apache.org/r/36333/#comment144275 We can remove this line since it is checked inside ensureAssignment() already. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 137) https://reviews.apache.org/r/36333/#comment144239 Renaming to ensurePartitionAssigned? clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 212) https://reviews.apache.org/r/36333/#comment144276 There is a potential risk of not aligning the scheduling of heartbeat with the discovery of the coordinator. For example, let's say: 1. at t0 we call initHeartbeatTask with interval 100; 2. at t1 the consumer already find the coordinator, but it will not send the first HB until t100; 3. at t100 the consumer may find itself already been kicked out of the group by the coordinator, and reschedule at t200 and re-join group. 4. at t101 the consumer has re-joined the group, but will not send the HB until t200, and so on .. clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 280) https://reviews.apache.org/r/36333/#comment144277 Why we change the behavior to directly throw exception here? clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java (line 22) https://reviews.apache.org/r/36333/#comment144246 This comment is not correct since the function returns void. clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java (lines 1 - 43) https://reviews.apache.org/r/36333/#comment144278 Not clear why you want to convert the future type in this adapter, can you elaborate a bit? - Guozhang Wang On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/ --- (Updated July 8, 2015, 9:19 p.m.) Review request for kafka. Bugs: KAFKA-2123 https://issues.apache.org/jira/browse/KAFKA-2123 Repository: kafka Description --- KAFKA-2123; resolve problems from rebase Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java