lianetm commented on code in PR #18737:
URL: https://github.com/apache/kafka/pull/18737#discussion_r1947077467
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -414,7 +415,15 @@ private void process(final ResetOffsetEvent event) {
*/
private void process(final CheckAndUpdatePositionsEvent event) {
CompletableFuture<Boolean> future =
requestManagers.offsetsRequestManager.updateFetchPositions(event.deadlineMs());
- future.whenComplete(complete(event.future()));
+ future.whenComplete((value, exception) -> {
+ if (exception != null)
+ event.future().completeExceptionally(exception);
+ else {
+
requestManagers.commitRequestManager.ifPresent(commitRequestManager ->
+
commitRequestManager.setLatestPartitionOffsets(subscriptions.allConsumed()));
Review Comment:
could you elaborate on the callback alternative you mentioned? Seems
interesting if we could end with just adding a future in the
`AsyncCommitEvent` to know when allConsumedRetrieved. The app thread
commitAsync would block on that right after sending the commitAsync to the
background, not waiting for any request, just for the confirmation that the
async got the allConsumed that need to be committed.
It's a similar approach we have in other api calls, ex. seek that blocks
until it has confirmation that the action on the subscription state happened in
the background, no requests. Diff here is that you need to know 2 things about
the async event right? (when it read the allConsumed, and when it got a
response for the request)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]