Re: Review Request 36333: Patch for KAFKA-2123

2015-07-15 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91808
---

Ship it!


Ship It!

- Guozhang Wang


On July 15, 2015, 1:21 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 15, 2015, 1:21 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 KAFKA-2123; more review fixes
 
 
 KAFKA-2123; refresh metadata on listOffset failure
 
 
 KAFKA-2123; fix comment issues
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  d085fe5c9e2a0567893508a1c71f014fae6d7510 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  405efdc7a59438731cbc3630876bda0042a3adb3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ee1ede01efa070409b86f5d8874cd578e058ce51 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
  PRE-CREATION 
   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 
 
 Diff: https://reviews.apache.org/r/36333/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Jason Gustafson
 




Re: Review Request 36333: Patch for KAFKA-2123

2015-07-15 Thread Onur Karaman

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91810
---



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (lines 274 - 276)
https://reviews.apache.org/r/36333/#comment145455

I think this is a bug. schedule takes a timestamp, not the delta.


- Onur Karaman


On July 15, 2015, 1:21 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 15, 2015, 1:21 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 KAFKA-2123; more review fixes
 
 
 KAFKA-2123; refresh metadata on listOffset failure
 
 
 KAFKA-2123; fix comment issues
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  d085fe5c9e2a0567893508a1c71f014fae6d7510 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  405efdc7a59438731cbc3630876bda0042a3adb3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  ee1ede01efa070409b86f5d8874cd578e058ce51 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
  PRE-CREATION 
   

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-15 Thread Jason Gustafson


 On July 15, 2015, 8:10 p.m., Onur Karaman wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   lines 280-282
  https://reviews.apache.org/r/36333/diff/5/?file=1011945#file1011945line280
 
  I think this is a bug. schedule takes a timestamp, not the delta.

Good catch. I'll submit a follow-up patch. I wonder if it ought to take a delta 
since most cases are just passing something like time.milliseconds() + 
interval.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91810
---


On July 15, 2015, 1:21 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 15, 2015, 1:21 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 KAFKA-2123; more review fixes
 
 
 KAFKA-2123; refresh metadata on listOffset failure
 
 
 KAFKA-2123; fix comment issues
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  d085fe5c9e2a0567893508a1c71f014fae6d7510 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  405efdc7a59438731cbc3630876bda0042a3adb3 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-14 Thread Jason Gustafson


 On July 14, 2015, 6 p.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java,
   line 274
  https://reviews.apache.org/r/36333/diff/2-3/?file=1009096#file1009096line274
 
  Seems like we're not handling this anymore? sendListOffsetRequest will 
  return this type of error if we don't have metadata, the leader is unknown, 
  the topic partition doesn't exist, or the broker isn't the leader for the 
  partition.
  
  I think this is easy to miss because there's a number of layers in this 
  class that don't get tested directly, so there are a lot of scenarios that 
  the unit test probably isn't covering now. Maybe fix up handling of the 
  exception for now and file a jira to follow up with better unit tests, 
  especially if we think this code is finally settling down?

Haha, I don't think I'll be able to persuade Guozhang to commit another 
refactor, so I think this is it. I agree on the need for more unit tests for 
Fetcher. I'll create a new jira.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91646
---


On July 14, 2015, 8:21 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 14, 2015, 8:21 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 KAFKA-2123; more review fixes
 
 
 KAFKA-2123; refresh metadata on listOffset failure
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-14 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91651
---


LGTM overall, just some minor comments below.


clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 815)
https://reviews.apache.org/r/36333/#comment145173

Is this comment accurate? Could a sync commit throw exception when it is no 
longer retriable on returned Future?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 1071)
https://reviews.apache.org/r/36333/#comment145207

nit: not capitalizing the first letter just for comment consistency?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 175)
https://reviews.apache.org/r/36333/#comment145218

I think for in-function comments we will not capitalize comments, same 
below.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 186)
https://reviews.apache.org/r/36333/#comment145216

decapitalize ensure that?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 222)
https://reviews.apache.org/r/36333/#comment145215

missing one @param.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 234)
https://reviews.apache.org/r/36333/#comment145219

Same here and below regarding comments.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 615)
https://reviews.apache.org/r/36333/#comment145226

nit: Move this class right after sendOffsetCommitRequest()?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
 (line 41)
https://reviews.apache.org/r/36333/#comment145227

Comment: remove all instances of the task in the queue?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
(line 144)
https://reviews.apache.org/r/36333/#comment145229

Decapitalize comments, same below.



clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 (line 46)
https://reviews.apache.org/r/36333/#comment145233

Maybe rename to testSend(...) and below just for naming consistency?



clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 (line 286)
https://reviews.apache.org/r/36333/#comment145234

Did you check the expected tag works here? I have seen in some old junit 
versions the test will still pass even if the exception is now thrown.



clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
 (line 62)
https://reviews.apache.org/r/36333/#comment145235

Rename to testRemove?



clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 (line 63)
https://reviews.apache.org/r/36333/#comment145236

testReset?



clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
 (line 23)
https://reviews.apache.org/r/36333/#comment145237

testCompose... ? Same below.


l,

- Guozhang Wang


On July 14, 2015, 8:21 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 14, 2015, 8:21 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 KAFKA-2123; more review fixes
 
 
 KAFKA-2123; refresh metadata on listOffset failure
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-14 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/
---

(Updated July 14, 2015, 8:21 p.m.)


Review request for kafka.


Bugs: KAFKA-2123
https://issues.apache.org/jira/browse/KAFKA-2123


Repository: kafka


Description (updated)
---

KAFKA-2123; resolve problems from rebase


KAFKA-2123; address review comments


KAFKA-2123; more review fixes


KAFKA-2123; refresh metadata on listOffset failure


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
fd98740bff175cc9d5bc02e365d88e011ef65d22 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
46e26a665a22625d50888efa7b53472279f36e79 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1c8172cd45f6715262f9a6f497a7b1797a834a3 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
695eaf63db9a5fa20dc2ca68957901462a96cd96 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 51eae1944d5c17cf838be57adf560bafe36fbfbd 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 13fc9af7392b4ade958daf3b0c9a165ddda351a6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 683745304c671952ff566f23b5dd4cf3ab75377a 
  
clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
4c0ecc3badd99727b5bd9d430364e61c184e0923 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 d085fe5c9e2a0567893508a1c71f014fae6d7510 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 405efdc7a59438731cbc3630876bda0042a3adb3 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ee1ede01efa070409b86f5d8874cd578e058ce51 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
 PRE-CREATION 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 

Diff: https://reviews.apache.org/r/36333/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 36333: Patch for KAFKA-2123

2015-07-14 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/
---

(Updated July 15, 2015, 1:21 a.m.)


Review request for kafka.


Bugs: KAFKA-2123
https://issues.apache.org/jira/browse/KAFKA-2123


Repository: kafka


Description (updated)
---

KAFKA-2123; resolve problems from rebase


KAFKA-2123; address review comments


KAFKA-2123; more review fixes


KAFKA-2123; refresh metadata on listOffset failure


KAFKA-2123; fix comment issues


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
fd98740bff175cc9d5bc02e365d88e011ef65d22 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
46e26a665a22625d50888efa7b53472279f36e79 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1c8172cd45f6715262f9a6f497a7b1797a834a3 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
695eaf63db9a5fa20dc2ca68957901462a96cd96 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 51eae1944d5c17cf838be57adf560bafe36fbfbd 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 13fc9af7392b4ade958daf3b0c9a165ddda351a6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 683745304c671952ff566f23b5dd4cf3ab75377a 
  
clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
4c0ecc3badd99727b5bd9d430364e61c184e0923 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 d085fe5c9e2a0567893508a1c71f014fae6d7510 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 405efdc7a59438731cbc3630876bda0042a3adb3 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ee1ede01efa070409b86f5d8874cd578e058ce51 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
 PRE-CREATION 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 

Diff: https://reviews.apache.org/r/36333/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 36333: Patch for KAFKA-2123

2015-07-14 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91646
---



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
https://reviews.apache.org/r/36333/#comment145171

Seems like we're not handling this anymore? sendListOffsetRequest will 
return this type of error if we don't have metadata, the leader is unknown, the 
topic partition doesn't exist, or the broker isn't the leader for the partition.

I think this is easy to miss because there's a number of layers in this 
class that don't get tested directly, so there are a lot of scenarios that the 
unit test probably isn't covering now. Maybe fix up handling of the exception 
for now and file a jira to follow up with better unit tests, especially if we 
think this code is finally settling down?


One last issue I found, but otherwise LGTM now.

- Ewen Cheslack-Postava


On July 14, 2015, 1:45 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 14, 2015, 1:45 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 KAFKA-2123; more review fixes
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  d085fe5c9e2a0567893508a1c71f014fae6d7510 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-13 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91564
---



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 961)
https://reviews.apache.org/r/36333/#comment144989

Hmm, this seems like very different behavior from before. Won't this 
trigger an offset fetch request *every* time this method is called? Seems like 
that could be very bad behavior if I wanted to do something like list the 
committed offsets for the partitions this consumer owns (i.e. by iterating over 
the very confusingly named SetTopicPartitions subscriptions(), which returns 
assigned partitions).

Wouldn't the previous logic where it checks if we have the committed offset 
first make sense?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 197)
https://reviews.apache.org/r/36333/#comment144993

Won't this always sleep even if we succeeded? Unlike similar code earlier 
in this class, this one doesn't check if future.succeeded().



clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 (line 516)
https://reviews.apache.org/r/36333/#comment144998

It looks like a bunch of reorganization + addition of createCoordinator() 
calls were added, but it looks like they all use a MockRebalanceCallback? Even 
the ones that explicitly create their own callback and pass it into 
createCoordinator()? Maybe I'm just missing the reason for these changes?


- Ewen Cheslack-Postava


On July 12, 2015, 12:34 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 12, 2015, 12:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-13 Thread Jason Gustafson


 On July 14, 2015, 12:04 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   lines 117-131
  https://reviews.apache.org/r/36333/diff/1-2/?file=1002924#file1002924line117
 
  With this change, we are now always sending an OffsetFetchRequest even 
  when subscriptions.refreshCommitsNeeded returns false?

You are right. The weird thing about this API is that it accepts the partitions 
to refresh, but SubscriptionState only has a single flag indicating that a 
refresh is needed. This means that refreshing committed offsets for a subset of 
the partitions (which is what KafkaConsumer does) could cause us to fail to 
refresh the other partitions. I think maybe we should just remove the 
partitions parameter and always refresh all assigned partitions when a refresh 
is needed. Either that or we need to invalidate committed offsets on a 
per-partition basis.


 On July 14, 2015, 12:04 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   lines 558-559
  https://reviews.apache.org/r/36333/diff/2/?file=1009093#file1009093line558
 
  When the coordinator is dead / not known yet, the consumerCoordinator 
  field could be null, but since we do not stop scheduling the heartbeat 
  tasks, will this cause triggering client.send(null, ...)?

Actually I changed the code so that heartbeats are only rescheduled when the 
coordinator is known (and we are not awaiting a group join). Take a look at 
HeartbeatTask and see if it addresses this concern.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91565
---


On July 12, 2015, 12:34 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 12, 2015, 12:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-13 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91565
---



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (lines 117 - 131)
https://reviews.apache.org/r/36333/#comment144990

With this change, we are now always sending an OffsetFetchRequest even when 
subscriptions.refreshCommitsNeeded returns false?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (lines 506 - 507)
https://reviews.apache.org/r/36333/#comment144992

When the coordinator is dead / not known yet, the consumerCoordinator field 
could be null, but since we do not stop scheduling the heartbeat tasks, will 
this cause triggering client.send(null, ...)?


- Guozhang Wang


On July 12, 2015, 12:34 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 12, 2015, 12:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  d085fe5c9e2a0567893508a1c71f014fae6d7510 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-13 Thread Jason Gustafson


 On July 14, 2015, 12:32 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 968
  https://reviews.apache.org/r/36333/diff/1-2/?file=1002921#file1002921line968
 
  Hmm, this seems like very different behavior from before. Won't this 
  trigger an offset fetch request *every* time this method is called? Seems 
  like that could be very bad behavior if I wanted to do something like list 
  the committed offsets for the partitions this consumer owns (i.e. by 
  iterating over the very confusingly named SetTopicPartitions 
  subscriptions(), which returns assigned partitions).
  
  Wouldn't the previous logic where it checks if we have the committed 
  offset first make sense?

Yes, it's missing a check for subscriptions.refreshCommitsNeeded. I had tried 
to fix a problem I noticed with using a subset of the assigned partitions when 
refreshing commits (see my comment in Guozhang's review above), but my fix 
inadvertently introduced this and it didn't really solve the problem either. 
I'll take another look.


 On July 14, 2015, 12:32 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 202
  https://reviews.apache.org/r/36333/diff/1-2/?file=1002924#file1002924line202
 
  Won't this always sleep even if we succeeded? Unlike similar code 
  earlier in this class, this one doesn't check if future.succeeded().

Yep, good catch.


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91564
---


On July 12, 2015, 12:34 a.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 12, 2015, 12:34 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 KAFKA-2123; address review comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
  74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-13 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/
---

(Updated July 14, 2015, 1:45 a.m.)


Review request for kafka.


Bugs: KAFKA-2123
https://issues.apache.org/jira/browse/KAFKA-2123


Repository: kafka


Description (updated)
---

KAFKA-2123; resolve problems from rebase


KAFKA-2123; address review comments


KAFKA-2123; more review fixes


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
fd98740bff175cc9d5bc02e365d88e011ef65d22 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b4e8f7f0dceefaf94a3495f39f5783cce5ceb25f 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
46e26a665a22625d50888efa7b53472279f36e79 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1c8172cd45f6715262f9a6f497a7b1797a834a3 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
695eaf63db9a5fa20dc2ca68957901462a96cd96 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 51eae1944d5c17cf838be57adf560bafe36fbfbd 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 13fc9af7392b4ade958daf3b0c9a165ddda351a6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 683745304c671952ff566f23b5dd4cf3ab75377a 
  
clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
4c0ecc3badd99727b5bd9d430364e61c184e0923 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 d085fe5c9e2a0567893508a1c71f014fae6d7510 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 405efdc7a59438731cbc3630876bda0042a3adb3 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ee1ede01efa070409b86f5d8874cd578e058ce51 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
 PRE-CREATION 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 

Diff: https://reviews.apache.org/r/36333/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 36333: Patch for KAFKA-2123

2015-07-11 Thread Ewen Cheslack-Postava


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 775
  https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775
 
  I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?
 
 Jason Gustafson wrote:
 I think this is still debatable. Is wakeup() sufficient to assuage our 
 guilt for allowing poll to block indefinitely in spite of the passed timeout? 
 Perhaps I'm the only one, but I'm still holding out hope that we'll be able 
 to enforce the timeout even if we are in the middle of a join group. The code 
 is actually not that far from being able to do so.
 
 Guozhang Wang wrote:
 Yeah makes sense.
 
 Ewen Cheslack-Postava wrote:
 I'm still holding out hope as well :) There were one or two places in 
 this patch where timeouts aren't passed into some methods where I think they 
 could be passed in and respsected safely, but I didn't want to make this 
 patch even bigger.
 
 Gwen Shapira wrote:
 Same here :) Is there a JIRA to track this plan?
 
 Jason Gustafson wrote:
 My plan was to resolve this as part of KAFKA-1894, but perhaps a 
 different ticket is needed?

KAFK-1894 seems sufficient, but as you're refactoring this anyway, might make 
sense to try to track what methods don't respect timeouts and could cause 
problems, especially all the way up at the KafkaConsumer level. We could figure 
this out again later, but better to just document what's required now, which 
also gives an idea of how much work there is left to do.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
---


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 8, 2015, 9:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-11 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/
---

(Updated July 12, 2015, 12:34 a.m.)


Review request for kafka.


Bugs: KAFKA-2123
https://issues.apache.org/jira/browse/KAFKA-2123


Repository: kafka


Description (updated)
---

KAFKA-2123; resolve problems from rebase


KAFKA-2123; address review comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
fd98740bff175cc9d5bc02e365d88e011ef65d22 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
 74dfdba0ecbca04947adba5eabb1cb5190a0cd5f 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
7aa076084c894bb8f47b9df2c086475b06f47060 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
46e26a665a22625d50888efa7b53472279f36e79 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1c8172cd45f6715262f9a6f497a7b1797a834a3 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
695eaf63db9a5fa20dc2ca68957901462a96cd96 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 51eae1944d5c17cf838be57adf560bafe36fbfbd 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 13fc9af7392b4ade958daf3b0c9a165ddda351a6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 683745304c671952ff566f23b5dd4cf3ab75377a 
  
clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
4c0ecc3badd99727b5bd9d430364e61c184e0923 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 d085fe5c9e2a0567893508a1c71f014fae6d7510 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 405efdc7a59438731cbc3630876bda0042a3adb3 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ee1ede01efa070409b86f5d8874cd578e058ce51 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
 PRE-CREATION 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 

Diff: https://reviews.apache.org/r/36333/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 36333: Patch for KAFKA-2123

2015-07-10 Thread Gwen Shapira


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 775
  https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775
 
  I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?
 
 Jason Gustafson wrote:
 I think this is still debatable. Is wakeup() sufficient to assuage our 
 guilt for allowing poll to block indefinitely in spite of the passed timeout? 
 Perhaps I'm the only one, but I'm still holding out hope that we'll be able 
 to enforce the timeout even if we are in the middle of a join group. The code 
 is actually not that far from being able to do so.
 
 Guozhang Wang wrote:
 Yeah makes sense.
 
 Ewen Cheslack-Postava wrote:
 I'm still holding out hope as well :) There were one or two places in 
 this patch where timeouts aren't passed into some methods where I think they 
 could be passed in and respsected safely, but I didn't want to make this 
 patch even bigger.

Same here :) Is there a JIRA to track this plan?


- Gwen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
---


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 8, 2015, 9:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-10 Thread Ewen Cheslack-Postava


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 775
  https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775
 
  I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?
 
 Jason Gustafson wrote:
 I think this is still debatable. Is wakeup() sufficient to assuage our 
 guilt for allowing poll to block indefinitely in spite of the passed timeout? 
 Perhaps I'm the only one, but I'm still holding out hope that we'll be able 
 to enforce the timeout even if we are in the middle of a join group. The code 
 is actually not that far from being able to do so.
 
 Guozhang Wang wrote:
 Yeah makes sense.

I'm still holding out hope as well :) There were one or two places in this 
patch where timeouts aren't passed into some methods where I think they could 
be passed in and respsected safely, but I didn't want to make this patch even 
bigger.


- Ewen


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
---


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 8, 2015, 9:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  d085fe5c9e2a0567893508a1c71f014fae6d7510 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-10 Thread Jason Gustafson


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   lines 88-93
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line88
 
  This is not introduced in the patch, but I am not sure if this is the 
  right way to respect backoff time. For example, if the destination broker 
  is down for a short period of time, poll(retryBackoffMs) will immediately 
  return, and hence this function will busy triggering poll() and fluding the 
  network with metadata requests right?
  
  What we want in this case, is that the consumer should wait for 
  retryBackoffMs before retry sending the next metadata request.
 
 Jason Gustafson wrote:
 This code was lifted from KafkaConsumer, but I think you are right. I'll 
 fix it for the next patch.

Actually, I think this implementation may be reasonable. Retries and backoff 
are already handled in NetworkClient, so this wouldn't flood the network with 
additional requests. I think we could use poll(Long.MAX_VALUE) if we wanted 
though.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 775
  https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775
 
  I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?
 
 Jason Gustafson wrote:
 I think this is still debatable. Is wakeup() sufficient to assuage our 
 guilt for allowing poll to block indefinitely in spite of the passed timeout? 
 Perhaps I'm the only one, but I'm still holding out hope that we'll be able 
 to enforce the timeout even if we are in the middle of a join group. The code 
 is actually not that far from being able to do so.
 
 Guozhang Wang wrote:
 Yeah makes sense.
 
 Ewen Cheslack-Postava wrote:
 I'm still holding out hope as well :) There were one or two places in 
 this patch where timeouts aren't passed into some methods where I think they 
 could be passed in and respsected safely, but I didn't want to make this 
 patch even bigger.
 
 Gwen Shapira wrote:
 Same here :) Is there a JIRA to track this plan?

My plan was to resolve this as part of KAFKA-1894, but perhaps a different 
ticket is needed?


- Jason


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
---


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 8, 2015, 9:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-09 Thread Jason Gustafson


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  Browsed through the patch, overall looks very promising.
  
  I am not very clear on a few detailed changes though:
  
  1. The request future adapter / handler modifications.
  2. Retry backoff implementation seems not correct.
  
  Could you explain a little bit on these two aspects?

Thanks for the feedback. Comments below.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java,
   line 27
  https://reviews.apache.org/r/36333/diff/1/?file=1002919#file1002919line27
 
  You may want to add the committed offset map in the callback since 
  otherwise it is unclear which commit it is referring to when triggered.

This is a good idea.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   lines 88-93
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line88
 
  This is not introduced in the patch, but I am not sure if this is the 
  right way to respect backoff time. For example, if the destination broker 
  is down for a short period of time, poll(retryBackoffMs) will immediately 
  return, and hence this function will busy triggering poll() and fluding the 
  network with metadata requests right?
  
  What we want in this case, is that the consumer should wait for 
  retryBackoffMs before retry sending the next metadata request.

This code was lifted from KafkaConsumer, but I think you are right. I'll fix it 
for the next patch.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 120
  https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line120
 
  We can remove this line since it is checked inside ensureAssignment() 
  already.

It's a little subtle, but the call to ensureCoordinatorKnown in 
ensureAssignment is inside the loop, so it will only be called when an 
assignment is needed. I considered putting it outside the loop, but inside is 
where it needs to be, and it looks odd to have it in both places.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 218
  https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line218
 
  There is a potential risk of not aligning the scheduling of heartbeat 
  with the discovery of the coordinator. For example, let's say:
  
  1. at t0 we call initHeartbeatTask with interval 100;
  2. at t1 the consumer already find the coordinator, but it will not 
  send the first HB until t100;
  3. at t100 the consumer may find itself already been kicked out of the 
  group by the coordinator, and reschedule at t200 and re-join group.
  4. at t101 the consumer has re-joined the group, but will not send the 
  HB until t200, and so on ..

Perhaps the way to deal with this is to give the heartbeat task suspend() and 
resume() methods so that we can align it properly according to coordinator 
events.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 296
  https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line296
 
  Why we change the behavior to directly throw exception here?

I removed the retry action from RequestFuture in favor of a check for 
RetriableException. In this case, both errors are retriable, so the result is 
the same.


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java,
   lines 1-43
  https://reviews.apache.org/r/36333/diff/1/?file=1002931#file1002931line1
 
  Not clear why you want to convert the future type in this adapter, can 
  you elaborate a bit?

The idea of the future adapter was to provide a way to convert from a future of 
one type to another. The use case was converting RequestFutureClientResponse 
to instances of the respective request (e.g. RequestFutureJoinGroupResponse). 
If you look at the error handling in the consumer, they are all doing basically 
the same thing. If the error is NOT_COORDINATOR_FOR_CONSUMER, for example, we 
just mark the coordinator dead. I hoped that this api would allow us to handle 
some coordinator errors generically while leaving the respective request 
handler to deal only with its specific errors. Unfortunately, the only generic 
check that this enabled was the check for a disconnect. To locate server error 
codes, you have to traverse the respective response structures, which cannot be 
done generically. At the moment, I'm not sure it brings enough to the table to 
justify its existence, but since it's a fairly common pattern with future 
handling and one that might be helpful in the future, I op
 ted to 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-09 Thread Jason Gustafson


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 121
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121
 
  Is this the behavior we want? Both timeout and 
  delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks 
  especially, it seems like we're tying the failure of requests to unrelated 
  events?
  
  Previously, I thought request failures may happen quickly, but then 
  there was a Utils.sleep backoff. I am not seeing how that is handled now? 
  If the connection isn't established, won't poll() not send out requests, 
  run client.poll(), possibly return very quickly before the connection is 
  established/fails, then fail the unsent requests, then just return to the 
  caller? And that caller might be one of those while() loops, so it may just 
  continue to retry, busy looping while not actually accomplishing anything.
  
  If we're going to introduce queuing of send requests here, it seems 
  like a timeout (rather than fast fail + backoff) might be a more natural 
  solution. So rather than clearUnsentRequests, it might be 
  clearExpiredRequests.

This is not clear from the comments, but the main reason for the unsent request 
queue in this class is to get around NetworkClient's single-send behavior. 
Currently, NetworkClient will only accept one send at a time for a given node. 
If you have multiple requests, you have to sprinkle in a bunch of poll() calls 
and use ready() to make sure that they can all get buffered. This is annoying 
because you cannot attach a callback to a request until you are actually able 
to send it. This means you always have to send requests in a poll loop, which 
implies you can't really have asynchronous requests. The unsent request queue 
provides a way to buffer these requests before transporting to the network 
layer which allows us to give a future to the caller immediately. 

Now, when the user calls poll, we try to send all of the unsent requests 
immediately (by doing the ready()/poll() dance until nothing else can be sent). 
Note that none of this depends on the timeout passed to poll: we are just 
calling poll(0) until nothing can be sent. After that, then we call 
poll(timeout). It may be the case that there were some requests that failed to 
be sent. This could be because the client is still connecting to the node or 
that its send buffer is full. To make the behavior of this class easy to 
understand, I just fail these requests which means that the unsent request 
queue never accumulates beyond a poll call. So every request sent either gets 
transported or fails.

I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. It 
should probably be invoked whenever we retry a request to the coordinator. One 
potential issue is that the configuration retry.backoff.ms (as its currently 
documented) is only intended for fetch requests, but we have been using it for 
all requests. Perhaps there should be a separate configuration? For example 
coordinator.retry.backoff.ms? Or perhaps a hard-coded value is really what we 
want.


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 91
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91
 
  Is the retryBackoffMs here because that's how long you want to wait for 
  a response before we jump back out of the poll, which fails the request and 
  triggers another NetworkClient.poll() call, which in turn sends the 
  MetadataRequest?
  
  Just trying to figure out the flow because a) the flow is unclear and 
  b) this is the only use of metadata in this class. Would this make sense to 
  push into NetworkClient, which is the one responsible for making the 
  request anyway? I'm not sure it's a good idea since nothing besides this 
  class uses NetworkClient anymore, but worth thinking about.

Guozhang mentioned this code as well, which was copied from KafkaConsumer. I 
think the use of retryBackoffMs is incorrect. It probably could be poll(-1) 
since we are not actually resending any requests.

To me, the weird thing has always been that metadata updates are handled 
directly in NetworkClient and not in the higher layers. I can see the reason 
for it, but it makes it difficult to attach code to metadata updates (which is 
exactly what we need to implement regex subscriptions). I would personally be 
in favor of pulling it out of NetworkClient, but that has implications for 
KafkaProducer as well. Moving this method into NetworkClient is probably a 
better idea at the moment and may give us a fighting chance to get the logic 
right.


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
   line 119
  

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-09 Thread Guozhang Wang


 On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
  line 775
  https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775
 
  I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?
 
 Jason Gustafson wrote:
 I think this is still debatable. Is wakeup() sufficient to assuage our 
 guilt for allowing poll to block indefinitely in spite of the passed timeout? 
 Perhaps I'm the only one, but I'm still holding out hope that we'll be able 
 to enforce the timeout even if we are in the middle of a join group. The code 
 is actually not that far from being able to do so.

Yeah makes sense.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
---


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 8, 2015, 9:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
 fd98740bff175cc9d5bc02e365d88e011ef65d22 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
 eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 7aa076084c894bb8f47b9df2c086475b06f47060 
   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
 46e26a665a22625d50888efa7b53472279f36e79 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
  695eaf63db9a5fa20dc2ca68957901462a96cd96 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
  51eae1944d5c17cf838be57adf560bafe36fbfbd 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
  683745304c671952ff566f23b5dd4cf3ab75377a 
   
 clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
 PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
  PRE-CREATION 
   
 clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
  PRE-CREATION 
   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
 4c0ecc3badd99727b5bd9d430364e61c184e0923 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
  d085fe5c9e2a0567893508a1c71f014fae6d7510 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
  PRE-CREATION 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
  405efdc7a59438731cbc3630876bda0042a3adb3 
   
 clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
  

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-09 Thread Guozhang Wang


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 91
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91
 
  Is the retryBackoffMs here because that's how long you want to wait for 
  a response before we jump back out of the poll, which fails the request and 
  triggers another NetworkClient.poll() call, which in turn sends the 
  MetadataRequest?
  
  Just trying to figure out the flow because a) the flow is unclear and 
  b) this is the only use of metadata in this class. Would this make sense to 
  push into NetworkClient, which is the one responsible for making the 
  request anyway? I'm not sure it's a good idea since nothing besides this 
  class uses NetworkClient anymore, but worth thinking about.
 
 Jason Gustafson wrote:
 Guozhang mentioned this code as well, which was copied from 
 KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably 
 could be poll(-1) since we are not actually resending any requests.
 
 To me, the weird thing has always been that metadata updates are handled 
 directly in NetworkClient and not in the higher layers. I can see the reason 
 for it, but it makes it difficult to attach code to metadata updates (which 
 is exactly what we need to implement regex subscriptions). I would personally 
 be in favor of pulling it out of NetworkClient, but that has implications for 
 KafkaProducer as well. Moving this method into NetworkClient is probably a 
 better idea at the moment and may give us a fighting chance to get the logic 
 right.

When implementing the new producer, people feel it is better letting 
NetworkClient handles metadata responses specifically aside from other 
responses so that the high-level module (Sender for producer, and whatever the 
module for consumer at that time) does not need to handle it. I think this 
motivation is still valid even as today, but we did not thought through the 
regex subscriptions at that time..


 On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
  clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
   line 121
  https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121
 
  Is this the behavior we want? Both timeout and 
  delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks 
  especially, it seems like we're tying the failure of requests to unrelated 
  events?
  
  Previously, I thought request failures may happen quickly, but then 
  there was a Utils.sleep backoff. I am not seeing how that is handled now? 
  If the connection isn't established, won't poll() not send out requests, 
  run client.poll(), possibly return very quickly before the connection is 
  established/fails, then fail the unsent requests, then just return to the 
  caller? And that caller might be one of those while() loops, so it may just 
  continue to retry, busy looping while not actually accomplishing anything.
  
  If we're going to introduce queuing of send requests here, it seems 
  like a timeout (rather than fast fail + backoff) might be a more natural 
  solution. So rather than clearUnsentRequests, it might be 
  clearExpiredRequests.
 
 Jason Gustafson wrote:
 This is not clear from the comments, but the main reason for the unsent 
 request queue in this class is to get around NetworkClient's single-send 
 behavior. Currently, NetworkClient will only accept one send at a time for a 
 given node. If you have multiple requests, you have to sprinkle in a bunch of 
 poll() calls and use ready() to make sure that they can all get buffered. 
 This is annoying because you cannot attach a callback to a request until you 
 are actually able to send it. This means you always have to send requests in 
 a poll loop, which implies you can't really have asynchronous requests. The 
 unsent request queue provides a way to buffer these requests before 
 transporting to the network layer which allows us to give a future to the 
 caller immediately. 
 
 Now, when the user calls poll, we try to send all of the unsent requests 
 immediately (by doing the ready()/poll() dance until nothing else can be 
 sent). Note that none of this depends on the timeout passed to poll: we are 
 just calling poll(0) until nothing can be sent. After that, then we call 
 poll(timeout). It may be the case that there were some requests that failed 
 to be sent. This could be because the client is still connecting to the node 
 or that its send buffer is full. To make the behavior of this class easy to 
 understand, I just fail these requests which means that the unsent request 
 queue never accumulates beyond a poll call. So every request sent either gets 
 transported or fails.
 
 I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. 
 It should probably be invoked 

Review Request 36333: Patch for KAFKA-2123

2015-07-08 Thread Jason Gustafson

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/
---

Review request for kafka.


Bugs: KAFKA-2123
https://issues.apache.org/jira/browse/KAFKA-2123


Repository: kafka


Description
---

KAFKA-2123; resolve problems from rebase


Diffs
-

  clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
fd98740bff175cc9d5bc02e365d88e011ef65d22 
  
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
7aa076084c894bb8f47b9df2c086475b06f47060 
  clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
46e26a665a22625d50888efa7b53472279f36e79 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 c1c8172cd45f6715262f9a6f497a7b1797a834a3 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java 
695eaf63db9a5fa20dc2ca68957901462a96cd96 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
 51eae1944d5c17cf838be57adf560bafe36fbfbd 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
 13fc9af7392b4ade958daf3b0c9a165ddda351a6 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
 683745304c671952ff566f23b5dd4cf3ab75377a 
  
clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
 PRE-CREATION 
  
clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
 PRE-CREATION 
  clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
4c0ecc3badd99727b5bd9d430364e61c184e0923 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
 d085fe5c9e2a0567893508a1c71f014fae6d7510 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
 PRE-CREATION 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 405efdc7a59438731cbc3630876bda0042a3adb3 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
 ee1ede01efa070409b86f5d8874cd578e058ce51 
  
clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
 PRE-CREATION 
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
476973b2c551db5be3f1c54f94990f0dd15ff65e 
  core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 

Diff: https://reviews.apache.org/r/36333/diff/


Testing
---


Thanks,

Jason Gustafson



Re: Review Request 36333: Patch for KAFKA-2123

2015-07-08 Thread Ewen Cheslack-Postava

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91016
---



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 91)
https://reviews.apache.org/r/36333/#comment144254

Is the retryBackoffMs here because that's how long you want to wait for a 
response before we jump back out of the poll, which fails the request and 
triggers another NetworkClient.poll() call, which in turn sends the 
MetadataRequest?

Just trying to figure out the flow because a) the flow is unclear and b) 
this is the only use of metadata in this class. Would this make sense to push 
into NetworkClient, which is the one responsible for making the request anyway? 
I'm not sure it's a good idea since nothing besides this class uses 
NetworkClient anymore, but worth thinking about.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 121)
https://reviews.apache.org/r/36333/#comment144258

Is this the behavior we want? Both timeout and delayedTasks.nextTimeout() 
can be arbitrarily small. For delayedTasks especially, it seems like we're 
tying the failure of requests to unrelated events?

Previously, I thought request failures may happen quickly, but then there 
was a Utils.sleep backoff. I am not seeing how that is handled now? If the 
connection isn't established, won't poll() not send out requests, run 
client.poll(), possibly return very quickly before the connection is 
established/fails, then fail the unsent requests, then just return to the 
caller? And that caller might be one of those while() loops, so it may just 
continue to retry, busy looping while not actually accomplishing anything.

If we're going to introduce queuing of send requests here, it seems like a 
timeout (rather than fast fail + backoff) might be a more natural solution. So 
rather than clearUnsentRequests, it might be clearExpiredRequests.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 113)
https://reviews.apache.org/r/36333/#comment144273

I think this is a hold over from previous code, so feel free to defer to 
another JIRA. The code in KafkaConsumer.committed() implies it should work for 
partitions not assigned to this consumer (and that is useful functionality we 
should probably support). But here, the argument is ignored in the condition 
for the while() loop, which seems error prone. I don't see a way that 
comitted() would cause subscriptions.refreshCommitsNeeded to return true, so I 
think it'll currently fail for unassigned partitions.

For this code, I think it's risky not to specifically be checking the 
requested partitions against the available set in subscriptions unless the 
caller is already guaranteed to do that.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 137)
https://reviews.apache.org/r/36333/#comment144272

This being separated from the rebalance callbacks concerns me a little, 
specifically because we're adding user callbacks and those can potentially make 
additional calls on the consumer. I scanned through all the usages and can 
currently only find one weird edge case that could create incorrect behavior:

1. Do async offset commit
2. During normal poll(), offset commit finishes and callback is invoked.
3. Callback changes calls committed(TopicPartition), which invokes 
refreshCommittedOffsets, which invokes ensureAssignment and handles the 
rebalance without invoking callbacks.

Actually, I think based on my earlier comment this won't currently happen, 
but only because I think committed() won't work properly with non-assigned 
partitions.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 298)
https://reviews.apache.org/r/36333/#comment144274

Why trigger commit refresh here?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 526)
https://reviews.apache.org/r/36333/#comment144281

Stray unnecessary Coordinator. prefix.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 640)
https://reviews.apache.org/r/36333/#comment144280

Is this really the what should be happening? I'm trying to figure out how 
we can even reach this code since RequestFutureCompletionHandler only has 
onComplete - RequestFuture.complete(), which indicates success.

This does need to handle both cases, but can we validly get to onFailure 
the way this class is used, or should this be some stronger assertion?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
 (line 74)
https://reviews.apache.org/r/36333/#comment144226

Since we bumped to Java 1.7 you 

Re: Review Request 36333: Patch for KAFKA-2123

2015-07-08 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
---


Browsed through the patch, overall looks very promising.

I am not very clear on a few detailed changes though:

1. The request future adapter / handler modifications.
2. Retry backoff implementation seems not correct.

Could you explain a little bit on these two aspects?


clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
 (line 27)
https://reviews.apache.org/r/36333/#comment144213

You may want to add the committed offset map in the callback since 
otherwise it is unclear which commit it is referring to when triggered.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(lines 542 - 545)
https://reviews.apache.org/r/36333/#comment144216

Is there any particular reason you want to materialize the sessionTimeoutMs 
variable? It seems only referred once at line 545.



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 765)
https://reviews.apache.org/r/36333/#comment144219

I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?



clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
(line 786)
https://reviews.apache.org/r/36333/#comment144233

Add some comments: re-schedule the commit task for the next commit 
interval?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (lines 88 - 93)
https://reviews.apache.org/r/36333/#comment144252

This is not introduced in the patch, but I am not sure if this is the right 
way to respect backoff time. For example, if the destination broker is down for 
a short period of time, poll(retryBackoffMs) will immediately return, and hence 
this function will busy triggering poll() and fluding the network with metadata 
requests right?

What we want in this case, is that the consumer should wait for 
retryBackoffMs before retry sending the next metadata request.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (line 119)
https://reviews.apache.org/r/36333/#comment144250

This function can be private.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 (lines 135 - 138)
https://reviews.apache.org/r/36333/#comment144253

Same as above in awaitMetadataUpdate().



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 114)
https://reviews.apache.org/r/36333/#comment144275

We can remove this line since it is checked inside ensureAssignment() 
already.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 137)
https://reviews.apache.org/r/36333/#comment144239

Renaming to ensurePartitionAssigned?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 212)
https://reviews.apache.org/r/36333/#comment144276

There is a potential risk of not aligning the scheduling of heartbeat with 
the discovery of the coordinator. For example, let's say:

1. at t0 we call initHeartbeatTask with interval 100;
2. at t1 the consumer already find the coordinator, but it will not send 
the first HB until t100;
3. at t100 the consumer may find itself already been kicked out of the 
group by the coordinator, and reschedule at t200 and re-join group.
4. at t101 the consumer has re-joined the group, but will not send the HB 
until t200, and so on ..



clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
 (line 280)
https://reviews.apache.org/r/36333/#comment144277

Why we change the behavior to directly throw exception here?



clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
 (line 22)
https://reviews.apache.org/r/36333/#comment144246

This comment is not correct since the function returns void.



clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
 (lines 1 - 43)
https://reviews.apache.org/r/36333/#comment144278

Not clear why you want to convert the future type in this adapter, can you 
elaborate a bit?


- Guozhang Wang


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/36333/
 ---
 
 (Updated July 8, 2015, 9:19 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2123
 https://issues.apache.org/jira/browse/KAFKA-2123
 
 
 Repository: kafka
 
 
 Description
 ---
 
 KAFKA-2123; resolve problems from rebase
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java