> On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, > > line 121 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line121> > > > > Is this the behavior we want? Both timeout and > > delayedTasks.nextTimeout() can be arbitrarily small. For delayedTasks > > especially, it seems like we're tying the failure of requests to unrelated > > events? > > > > Previously, I thought request failures may happen quickly, but then > > there was a Utils.sleep backoff. I am not seeing how that is handled now? > > If the connection isn't established, won't poll() not send out requests, > > run client.poll(), possibly return very quickly before the connection is > > established/fails, then fail the unsent requests, then just return to the > > caller? And that caller might be one of those while() loops, so it may just > > continue to retry, busy looping while not actually accomplishing anything. > > > > If we're going to introduce queuing of send requests here, it seems > > like a timeout (rather than fast fail + backoff) might be a more natural > > solution. So rather than clearUnsentRequests, it might be > > clearExpiredRequests.
This is not clear from the comments, but the main reason for the unsent request queue in this class is to get around NetworkClient's single-send behavior. Currently, NetworkClient will only accept one send at a time for a given node. If you have multiple requests, you have to sprinkle in a bunch of poll() calls and use ready() to make sure that they can all get buffered. This is annoying because you cannot attach a callback to a request until you are actually able to send it. This means you always have to send requests in a poll loop, which implies you can't really have asynchronous requests. The unsent request queue provides a way to buffer these requests before transporting to the network layer which allows us to give a future to the caller immediately. Now, when the user calls poll, we try to send all of the unsent requests immediately (by doing the ready()/poll() dance until nothing else can be sent). Note that none of this depends on the timeout passed to poll: we are just calling poll(0) until nothing can be sent. After that, then we call poll(timeout). It may be the case that there were some requests that failed to be sent. This could be because the client is still connecting to the node or that its send buffer is full. To make the behavior of this class easy to understand, I just fail these requests which means that the unsent request queue never accumulates beyond a poll call. So every request sent either gets transported or fails. I think the call to Utils.sleep(retryBackoffMs) was lost in the refactor. It should probably be invoked whenever we retry a request to the coordinator. One potential issue is that the configuration "retry.backoff.ms" (as its currently documented) is only intended for fetch requests, but we have been using it for all requests. Perhaps there should be a separate configuration? For example "coordinator.retry.backoff.ms"? Or perhaps a hard-coded value is really what we want. > On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java, > > line 91 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002923#file1002923line91> > > > > Is the retryBackoffMs here because that's how long you want to wait for > > a response before we jump back out of the poll, which fails the request and > > triggers another NetworkClient.poll() call, which in turn sends the > > MetadataRequest? > > > > Just trying to figure out the flow because a) the flow is unclear and > > b) this is the only use of metadata in this class. Would this make sense to > > push into NetworkClient, which is the one responsible for making the > > request anyway? I'm not sure it's a good idea since nothing besides this > > class uses NetworkClient anymore, but worth thinking about. Guozhang mentioned this code as well, which was copied from KafkaConsumer. I think the use of retryBackoffMs is incorrect. It probably could be poll(-1) since we are not actually resending any requests. To me, the weird thing has always been that metadata updates are handled directly in NetworkClient and not in the higher layers. I can see the reason for it, but it makes it difficult to attach code to metadata updates (which is exactly what we need to implement regex subscriptions). I would personally be in favor of pulling it out of NetworkClient, but that has implications for KafkaProducer as well. Moving this method into NetworkClient is probably a better idea at the moment and may give us a fighting chance to get the logic right. > On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, > > line 119 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line119> > > > > I think this is a hold over from previous code, so feel free to defer > > to another JIRA. The code in KafkaConsumer.committed() implies it should > > work for partitions not assigned to this consumer (and that is useful > > functionality we should probably support). But here, the argument is > > ignored in the condition for the while() loop, which seems error prone. I > > don't see a way that comitted() would cause > > subscriptions.refreshCommitsNeeded to return true, so I think it'll > > currently fail for unassigned partitions. > > > > For this code, I think it's risky not to specifically be checking the > > requested partitions against the available set in subscriptions unless the > > caller is already guaranteed to do that. That's a good point and may actually be a bug introduced in this patch. I think we should probably address it by exposing a direct way to get the committed offsets rather than going through the SubscriptionState class. > On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, > > line 143 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line143> > > > > This being separated from the rebalance callbacks concerns me a little, > > specifically because we're adding user callbacks and those can potentially > > make additional calls on the consumer. I scanned through all the usages and > > can currently only find one weird edge case that could create incorrect > > behavior: > > > > 1. Do async offset commit > > 2. During normal poll(), offset commit finishes and callback is invoked. > > 3. Callback changes calls committed(TopicPartition), which invokes > > refreshCommittedOffsets, which invokes ensureAssignment and handles the > > rebalance without invoking callbacks. > > > > Actually, I think based on my earlier comment this won't currently > > happen, but only because I think committed() won't work properly with > > non-assigned partitions. A rebalance can be triggered by a heartbeat, which could happen pretty much any time. So we probably should push the rebalance callback into Coordinator. The only reason I haven't done so already is that the callback requires a reference to KafkaConsumer, and passing one into Coordinator seemed too egregious. I wonder if we can instead remove the KafkaConsumer parameter from the callback? I don't think the user would have much trouble getting a reference to it in another way. > On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, > > line 314 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line314> > > > > Why trigger commit refresh here? I copied this from older versions of the code, but I've wondered the same. It does ensure that we will not reset any partitions to an older position before the commit request returns. It also ensures that committed() will not return before a pending asynchronous commit. However, in both of those cases, I think the alternative behavior would be reasonable as well. In the end, I kept it in there because I didn't have a strong case against it and because it didn't seem like a bad thing to refresh commits from the server from time to time. Now that I think about it a little more, it actually makes asynchronous commits more costly than synchronous commits since they always result in an additional FetchCommittedOffsets request. > On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java, > > line 739 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002924#file1002924line739> > > > > Is this really the what should be happening? I'm trying to figure out > > how we can even reach this code since RequestFutureCompletionHandler only > > has onComplete -> RequestFuture.complete(), which indicates success. > > > > This does need to handle both cases, but can we validly get to > > onFailure the way this class is used, or should this be some stronger > > assertion? Good question, but the answer is not so simple. RequestFutureCompletionHandler is used only when the request has been transmitted to NetworkClient. The request may fail before then in ConsumerNetworkClient, in which case there are two possibilities: disconnect or send failure. In the first case, coordinatorDead is what we want, but the latter case is trickier. A send failure just means we failed to forward the request to NetworkClient. This could be because it is still connecting or because its send buffer is full. NetworkClient doesn't expose a way to tell which case it is, but it actually matters. In the latter case, we probably do not want to mark the coordinator dead. If it's connecting, on the other hand, then it must have been previously disconnected. The question is whether the coordinator knew about the disconnect? Maybe, maybe not. We only know about disconnects if we happen to have a request active when the disconnect happens. If we had a request active, it would fail and w e would mark the coordinator dead. If it didn't then the coordinator missed the disconnect and probably needs to mark the coordinator dead. In fact, what can happen is that the consumer can get itself stuck in a state of perpetually connecting to a dead coordinator. It cannot see the disconnect because it cannot send a request, and it cannot send a request because the connection never completes. I couldn't come up with a great solution to this problem, so I opted for the easy way out, which is just to mark the coordinator dead. This we know we can recover from. A better solution would probably be to expose a connection state listener on NetworkClient to ensure that the consumer sees all disconnects. I started down this path for a while, but abandoned it when it started to get a little complex. It might be worth revisiting in a separate issue. > On July 9, 2015, 3:53 a.m., Ewen Cheslack-Postava wrote: > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java, > > line 267 > > <https://reviews.apache.org/r/36333/diff/1/?file=1002944#file1002944line267> > > > > Why did this last assertion for the assigned partition get dropped? It may have been unintentional. I'll check it out. - Jason ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/36333/#review91016 ----------------------------------------------------------- On July 8, 2015, 9:19 p.m., Jason Gustafson wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/36333/ > ----------------------------------------------------------- > > (Updated July 8, 2015, 9:19 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-2123 > https://issues.apache.org/jira/browse/KAFKA-2123 > > > Repository: kafka > > > Description > ------- > > KAFKA-2123; resolve problems from rebase > > > Diffs > ----- > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > fd98740bff175cc9d5bc02e365d88e011ef65d22 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerCommitCallback.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > eb75d2e797e3aa3992e4cf74b12f51c8f1545e02 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 7aa076084c894bb8f47b9df2c086475b06f47060 > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > 46e26a665a22625d50888efa7b53472279f36e79 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java > c1c8172cd45f6715262f9a6f497a7b1797a834a3 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTask.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueue.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java > 695eaf63db9a5fa20dc2ca68957901462a96cd96 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > 51eae1944d5c17cf838be57adf560bafe36fbfbd > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoAvailableBrokersException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFuture.java > 13fc9af7392b4ade958daf3b0c9a165ddda351a6 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureAdapter.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestFutureListener.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SendFailedException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/StaleMetadataException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > 683745304c671952ff566f23b5dd4cf3ab75377a > > clients/src/main/java/org/apache/kafka/common/errors/ConsumerCoordinatorNotAvailableException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/DisconnectException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/IllegalGenerationException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/NotCoordinatorForConsumerException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/OffsetLoadInProgressException.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/common/errors/UnknownConsumerIdException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 4c0ecc3badd99727b5bd9d430364e61c184e0923 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClientTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java > d085fe5c9e2a0567893508a1c71f014fae6d7510 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/DelayedTaskQueueTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java > 405efdc7a59438731cbc3630876bda0042a3adb3 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java > ee1ede01efa070409b86f5d8874cd578e058ce51 > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/RequestFutureTest.java > PRE-CREATION > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala > 476973b2c551db5be3f1c54f94990f0dd15ff65e > core/src/test/scala/integration/kafka/api/ConsumerTest.scala > 92ffb91b5e039dc0d4cd0e072ca46db32f280cf9 > > Diff: https://reviews.apache.org/r/36333/diff/ > > > Testing > ------- > > > Thanks, > > Jason Gustafson > >