> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
> >  line 121
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121>
> >
> >     Is this the behavior we want? Both timeout and 
> > delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks 
> > especially, it seems like we're tying the failure of requests to unrelated 
> > events?
> >     
> >     Previously, I thought request failures may happen quickly, but then 
> > there was a Utils.sleep backoff. I am not seeing how that is handled now? 
> > If the connection isn't established, won't poll() not send out requests, 
> > run client.poll(), possibly return very quickly before the connection is 
> > established/fails, then fail the unsent requests, then just return to the 
> > caller? And that caller might be one of those while() loops, so it may just 
> > continue to retry, busy looping while not actually accomplishing anything.
> >     
> >     If we're going to introduce queuing of send requests here, it seems 
> > like a timeout (rather than fast fail + backoff) might be a more natural 
> > solution. So rather than clearUnsentRequests, it might be 
> > clearExpiredRequests.

This is not clear from the comments, but the main reason for the unsent request 
queue in this class is to get around NetworkClient's single-send behavior. 
Currently, NetworkClient will only accept one send at a time for a given node. 
If you have multiple requests, you have to sprinkle in a bunch of poll() calls 
and use ready() to make sure that they can all get buffered. This is annoying 
because you cannot attach a callback to a request until you are actually able 
to send it. This means you always have to send requests in a poll loop, which 
implies you can't really have asynchronous requests. The unsent request queue 
provides a way to buffer these requests before transporting to the network 
layer which allows us to give a future to the caller immediately. 

Now, when the user calls poll, we try to send all of the unsent requests 
immediately (by doing the ready()/poll() dance until nothing else can be sent). 
Note that none of this depends on the timeout passed to poll: we are just 
calling poll(0) until nothing can be sent. After that, then we call 
poll(timeout). It may be the case that there were some requests that failed to 
be sent. This could be because the client is still connecting to the node or 
that its send buffer is full. To make the behavior of this class easy to 
understand, I just fail these requests which means that the unsent request 
queue never accumulates beyond a poll call. So every request sent either gets 
transported or fails.

I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. It 
should probably be invoked whenever we retry a request to the coordinator. One 
potential issue is that the configuration "retry.backoff.ms" (as its currently 
documented) is only intended for fetch requests, but we have been using it for 
all requests. Perhaps there should be a separate configuration? For example 
"coordinator.retry.backoff.ms"? Or perhaps a hard-coded value is really what we 
want.


> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java,
> >  line 91
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91>
> >
> >     Is the retryBackoffMs here because that's how long you want to wait for 
> > a response before we jump back out of the poll, which fails the request and 
> > triggers another NetworkClient.poll() call, which in turn sends the 
> > MetadataRequest?
> >     
> >     Just trying to figure out the flow because a) the flow is unclear and 
> > b) this is the only use of metadata in this class. Would this make sense to 
> > push into NetworkClient, which is the one responsible for making the 
> > request anyway? I'm not sure it's a good idea since nothing besides this 
> > class uses NetworkClient anymore, but worth thinking about.

Guozhang mentioned this code as well, which was copied from KafkaConsumer. I 
think the use of retryBackoffMs is incorrect. It probably could be poll(-1) 
since we are not actually resending any requests.

To me, the weird thing has always been that metadata updates are handled 
directly in NetworkClient and not in the higher layers. I can see the reason 
for it, but it makes it difficult to attach code to metadata updates (which is 
exactly what we need to implement regex subscriptions). I would personally be 
in favor of pulling it out of NetworkClient, but that has implications for 
KafkaProducer as well. Moving this method into NetworkClient is probably a 
better idea at the moment and may give us a fighting chance to get the logic 
right.


> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 119
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line119>
> >
> >     I think this is a hold over from previous code, so feel free to defer 
> > to another JIRA. The code in KafkaConsumer.committed() implies it should 
> > work for partitions not assigned to this consumer (and that is useful 
> > functionality we should probably support). But here, the argument is 
> > ignored in the condition for the while() loop, which seems error prone. I 
> > don't see a way that comitted() would cause 
> > subscriptions.refreshCommitsNeeded to return true, so I think it'll 
> > currently fail for unassigned partitions.
> >     
> >     For this code, I think it's risky not to specifically be checking the 
> > requested partitions against the available set in subscriptions unless the 
> > caller is already guaranteed to do that.

That's a good point and may actually be a bug introduced in this patch. I think 
we should probably address it by exposing a direct way to get the committed 
offsets rather than going through the SubscriptionState class.


> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 143
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line143>
> >
> >     This being separated from the rebalance callbacks concerns me a little, 
> > specifically because we're adding user callbacks and those can potentially 
> > make additional calls on the consumer. I scanned through all the usages and 
> > can currently only find one weird edge case that could create incorrect 
> > behavior:
> >     
> >     1. Do async offset commit
> >     2. During normal poll(), offset commit finishes and callback is invoked.
> >     3. Callback changes calls committed(TopicPartition), which invokes 
> > refreshCommittedOffsets, which invokes ensureAssignment and handles the 
> > rebalance without invoking callbacks.
> >     
> >     Actually, I think based on my earlier comment this won't currently 
> > happen, but only because I think committed() won't work properly with 
> > non-assigned partitions.

A rebalance can be triggered by a heartbeat, which could happen pretty much any 
time. So we probably should push the rebalance callback into Coordinator. The 
only reason I haven't done so already is that the callback requires a reference 
to KafkaConsumer, and passing one into Coordinator seemed too egregious. I 
wonder if we can instead remove the KafkaConsumer parameter from the callback? 
I don't think the user would have much trouble getting a reference to it in 
another way.


> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 314
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line314>
> >
> >     Why trigger commit refresh here?

I copied this from older versions of the code, but I've wondered the same. It 
does ensure that we will not reset any partitions to an older position before 
the commit request returns. It also ensures that committed() will not return 
before a pending asynchronous commit. However, in both of those cases, I think 
the alternative behavior would be reasonable as well. In the end, I kept it in 
there because I didn't have a strong case against it and because it didn't seem 
like a bad thing to refresh commits from the server from time to time. Now that 
I think about it a little more, it actually makes asynchronous commits more 
costly than synchronous commits since they always result in an additional 
FetchCommittedOffsets request.


> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java,
> >  line 739
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line739>
> >
> >     Is this really the what should be happening? I'm trying to figure out 
> > how we can even reach this code since RequestFutureCompletionHandler only 
> > has onComplete -> RequestFuture.complete(), which indicates success.
> >     
> >     This does need to handle both cases, but can we validly get to 
> > onFailure the way this class is used, or should this be some stronger 
> > assertion?

Good question, but the answer is not so simple. RequestFutureCompletionHandler 
is used only when the request has been transmitted to NetworkClient. The 
request may fail before then in ConsumerNetworkClient, in which case there are 
two possibilities: disconnect or send failure. In the first case, 
coordinatorDead is what we want, but the latter case is trickier. A send 
failure just means we failed to forward the request to NetworkClient. This 
could be because it is still connecting or because its send buffer is full. 
NetworkClient doesn't expose a way to tell which case it is, but it actually 
matters. In the latter case, we probably do not want to mark the coordinator 
dead. If it's connecting, on the other hand, then it must have been previously 
disconnected. The question is whether the coordinator knew about the 
disconnect? Maybe, maybe not. We only know about disconnects if we happen to 
have a request active when the disconnect happens. If we had a request active, 
it would fail and w
 e would mark the coordinator dead. If it didn't then the coordinator missed 
the disconnect and probably needs to mark the coordinator dead. In fact, what 
can happen is that the consumer can get itself stuck in a state of perpetually 
connecting to a dead coordinator. It cannot see the disconnect because it 
cannot send a request, and it cannot send a request because the connection 
never completes. 

I couldn't come up with a great solution to this problem, so I opted for the 
easy way out, which is just to mark the coordinator dead. This we know we can 
recover from. A better solution would probably be to expose a connection state 
listener on NetworkClient to ensure that the consumer sees all disconnects. I 
started down this path for a while, but abandoned it when it started to get a 
little complex. It might be worth revisiting in a separate issue.


> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote:
> > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java,
> >  line 267
> > <https://reviews.apache.org/r/36333/diff/1/?file=1002944#file1002944line267>
> >
> >     Why did this last assertion for the assigned partition get dropped?

It may have been unintentional. I'll check it out.


- Jason


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/36333/#review91016
-----------------------------------------------------------


On July 8, 2015, 9:19 p.m., Jason Gustafson wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/36333/
> -----------------------------------------------------------
> 
> (Updated July 8, 2015, 9:19 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-2123
>     https://issues.apache.org/jira/browse/KAFKA-2123
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> KAFKA-2123; resolve problems from rebase
> 
> 
> Diffs
> -----
> 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> fd98740bff175cc9d5bc02e365d88e011ef65d22 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 7aa076084c894bb8f47b9df2c086475b06f47060 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> 46e26a665a22625d50888efa7b53472279f36e79 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  c1c8172cd45f6715262f9a6f497a7b1797a834a3 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
>  695eaf63db9a5fa20dc2ca68957901462a96cd96 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  51eae1944d5c17cf838be57adf560bafe36fbfbd 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java
>  13fc9af7392b4ade958daf3b0c9a165ddda351a6 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  683745304c671952ff566f23b5dd4cf3ab75377a 
>   
> clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java 
> PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 4c0ecc3badd99727b5bd9d430364e61c184e0923 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java
>  d085fe5c9e2a0567893508a1c71f014fae6d7510 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java
>  PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
>  405efdc7a59438731cbc3630876bda0042a3adb3 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java
>  ee1ede01efa070409b86f5d8874cd578e058ce51 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java
>  PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 476973b2c551db5be3f1c54f94990f0dd15ff65e 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala 
> 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 
> 
> Diff: https://reviews.apache.org/r/36333/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jason Gustafson
> 
>

Reply via email to