lianetm commented on code in PR #16885:
URL: https://github.com/apache/kafka/pull/16885#discussion_r1725092614
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -256,14 +271,176 @@ private void process(final UnsubscribeEvent event) {
}
}
- private void process(final ResetPositionsEvent event) {
- CompletableFuture<Void> future =
requestManagers.offsetsRequestManager.resetPositionsIfNeeded();
- future.whenComplete(complete(event.future()));
+ /**
+ *
+ * Fetch committed offsets and use them to update positions in the
subscription state. If no
+ * committed offsets available, fetch offsets from the leader.
+ */
+ private void process(final UpdateFetchPositionsEvent
updateFetchPositionsEvent) {
+ try {
+ // The event could be completed in the app thread before it got to
be
+ // processed in the background (ex. interrupted)
+ if (updateFetchPositionsEvent.future().isCompletedExceptionally())
{
+ log.debug("UpdateFetchPositions event {} was completed
exceptionally before it " +
+ "got time to be processed.", updateFetchPositionsEvent);
+ return;
+ }
Review Comment:
Agreed, comment above but basically moved to the addAndGet as a general case
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]