> On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
> > Browsed through the patch, overall looks very promising.
> > 
> > I am not very clear on a few detailed changes though:
> > 
> > 1. The request future adapter / handler modifications.
> > 2. Retry backoff implementation seems not correct.
> > 
> > Could you explain a little bit on these two aspects?

Thanks for the feedback. Comments below.


> On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java,
> >  line 27
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002919#file1002919line27>
> >
> >     You may want to add the committed offset map in the callback since 
> > otherwise it is unclear which commit it is referring to when triggered.

This is a good idea.


> On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
> >  lines 88-93
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line88>
> >
> >     This is not introduced in the patch, but I am not sure if this is the 
> > right way to respect backoff time. For example, if the destination broker 
> > is down for a short period of time, poll(retryBackoffMs) will immediately 
> > return, and hence this function will busy triggering poll() and fluding the 
> > network with metadata requests right?
> >     
> >     What we want in this case, is that the consumer should wait for 
> > retryBackoffMs before retry sending the next metadata request.

This code was lifted from KafkaConsumer, but I think you are right. I'll fix it 
for the next patch.


> On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 120
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line120>
> >
> >     We can remove this line since it is checked inside ensureAssignment() 
> > already.

It's a little subtle, but the call to ensureCoordinatorKnown in 
ensureAssignment is inside the loop, so it will only be called when an 
assignment is needed. I considered putting it outside the loop, but inside is 
where it needs to be, and it looks odd to have it in both places.


> On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 218
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line218>
> >
> >     There is a potential risk of not aligning the scheduling of heartbeat 
> > with the discovery of the coordinator. For example, let's say:
> >     
> >     1. at t0 we call initHeartbeatTask with interval 100;
> >     2. at t1 the consumer already find the coordinator, but it will not 
> > send the first HB until t100;
> >     3. at t100 the consumer may find itself already been kicked out of the 
> > group by the coordinator, and reschedule at t200 and re-join group.
> >     4. at t101 the consumer has re-joined the group, but will not send the 
> > HB until t200, and so on ..

Perhaps the way to deal with this is to give the heartbeat task suspend() and 
resume() methods so that we can align it properly according to coordinator 
events.


> On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 296
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line296>
> >
> >     Why we change the behavior to directly throw exception here?

I removed the retry action from RequestFuture in favor of a check for 
RetriableException. In this case, both errors are retriable, so the result is 
the same.


> On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java,
> >  lines 1-43
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002931#file1002931line1>
> >
> >     Not clear why you want to convert the future type in this adapter, can 
> > you elaborate a bit?

The idea of the future adapter was to provide a way to convert from a future of 
one type to another. The use case was converting RequestFuture<ClientResponse> 
to instances of the respective request (e.g. RequestFuture<JoinGroupResponse>). 
If you look at the error handling in the consumer, they are all doing basically 
the same thing. If the error is NOT_COORDINATOR_FOR_CONSUMER, for example, we 
just mark the coordinator dead. I hoped that this api would allow us to handle 
some coordinator errors generically while leaving the respective request 
handler to deal only with its specific errors. Unfortunately, the only generic 
check that this enabled was the check for a disconnect. To locate server error 
codes, you have to traverse the respective response structures, which cannot be 
done generically. At the moment, I'm not sure it brings enough to the table to 
justify its existence, but since it's a fairly common pattern with future 
handling and one that might be helpful in the future, I op
 ted to leave it in there.


> On July 9, 2015, 1:42 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 775
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002921#file1002921line775>
> >
> >     I think KAFKA-1894 is already fixed in this patch + KAFKA-2168?

I think this is still debatable. Is wakeup() sufficient to assuage our guilt 
for allowing poll to block indefinitely in spite of the passed timeout? Perhaps 
I'm the only one, but I'm still holding out hope that we'll be able to enforce 
the timeout even if we are in the middle of a join group. The code is actually 
not that far from being able to do so.


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91012
-----------------------------------------------------------


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36333/
> -----------------------------------------------------------
> 
> (Updated July 8, 2015, 9:19 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2123; resolve problems from rebase
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> fd98740bff175cc9d5bc02e365d88e011ef65d22 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 7aa076084c894bb8f47b9df2c086475b06f47060 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> 46e26a665a22625d50888efa7b53472279f36e79 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  695eaf63db9a5fa20dc2ca68957901462a96cd96 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  51eae1944d5c17cf838be57adf560bafe36fbfbd 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
>  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  683745304c671952ff566f23b5dd4cf3ab75377a 
>   
> clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 4c0ecc3badd99727b5bd9d430364e61c184e0923 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  d085fe5c9e2a0567893508a1c71f014fae6d7510 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  405efdc7a59438731cbc3630876bda0042a3adb3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ee1ede01efa070409b86f5d8874cd578e058ce51 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
>  PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 476973b2c551db5be3f1c54f94990f0dd15ff65e 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 
> 
> Diff: https://reviews.apache.org/r/36333/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to