kirktrue commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2400262692


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -726,6 +757,57 @@ private void process(final 
StreamsOnAllTasksLostCallbackCompletedEvent event) {
         
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
     }
 
+    private void process(final CompositePollEvent event) {
+        CompositePollEventProcessorContext context = 
compositePollContext.orElseThrow(IllegalArgumentException::new);
+        ApplicationEvent.Type nextEventType = event.startingEventType();
+
+        if (context.maybeCompleteExceptionally(event) || 
context.maybeCompleteWithCallbackRequired(event, nextEventType))
+            return;

Review Comment:
   `maybeCompleteExceptionally()` checks 
`NetworkClientDelegate.getAndClearMetadataError()`. We can't access that from 
the application thread, so this is the first available opportunity to do so.
   
   It's possible for there to be offset commits that finished after the last 
`AsyncPollEvent` completed, so `maybeCompleteWithCallbackRequired()` catches 
those cases before we've started any new work on the new poll.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to