lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2402927027


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -726,6 +728,67 @@ private void process(final 
StreamsOnAllTasksLostCallbackCompletedEvent event) {
         
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
     }
 
+    private void process(final AsyncPollEvent event) {
+        AsyncPollEventProcessorContext context = 
asyncPollContext.orElseThrow(IllegalArgumentException::new);
+        ApplicationEvent.Type nextEventType = event.startingEventType();
+
+        if (context.maybeCompleteExceptionally(event) || 
context.maybeCompleteWithCallbackRequired(event, nextEventType))
+            return;
+
+        if (nextEventType == ApplicationEvent.Type.ASYNC_POLL) {
+            log.trace("Processing {} logic for {}", nextEventType, event);
+
+            // Trigger a reconciliation that can safely commit offsets if 
needed to rebalance,
+            // as we're processing before any new fetching starts
+            
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+                consumerMembershipManager.maybeReconcile(true));
+
+            if (requestManagers.commitRequestManager.isPresent()) {
+                CommitRequestManager commitRequestManager = 
requestManagers.commitRequestManager.get();
+                
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+
+                requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm 
-> {
+                    hrm.membershipManager().onConsumerPoll();
+                    hrm.resetPollTimer(event.pollTimeMs());
+                });
+                
requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm -> {
+                    hrm.membershipManager().onConsumerPoll();
+                    hrm.resetPollTimer(event.pollTimeMs());
+                });
+            }
+
+            nextEventType = ApplicationEvent.Type.CHECK_AND_UPDATE_POSITIONS;

Review Comment:
   this whole flow is already looking much simpler, nice! but still, this 
intermediate state seems to be only to ensure we don't attempt to trigger 
reconciliations/callbacks/metadataErrors on the first poll after the one where 
one of those were triggered, correct? But do we really need that? It would mean 
that we would skip triggering a reconciliation when it's maybe needed (would 
happen on a following poll, but why skipping it in the current one?)
   
   Alternatively, I imagine we could simply trigger all actions on each poll 
(trigger reconciliation, commit, updateFetchPositions, trigger fetch), 
short-circuiting as needed back to app thread, but no need to carry on from 
where we left of given that is all about triggering actions that will simply be 
no-op if repeated and not needed (and if we end up triggering 
reconciliation/callbacks/errors on 2 subsequent polls, it's because it's indeed 
needed)
   
   All those steps have their own "flow control" btw (maybeReconcile will only 
do something if no other reconciliation going on, metadata errors are cleared 
when thrown, auto-commit considers the interval and inflights...) that's why I 
wonder if we can just simply trigger them all on each iteration, no more state 
machine on the ASYNC_POLL?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -726,6 +757,57 @@ private void process(final 
StreamsOnAllTasksLostCallbackCompletedEvent event) {
         
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
     }
 
+    private void process(final CompositePollEvent event) {
+        CompositePollEventProcessorContext context = 
compositePollContext.orElseThrow(IllegalArgumentException::new);
+        ApplicationEvent.Type nextEventType = event.startingEventType();
+
+        if (context.maybeCompleteExceptionally(event) || 
context.maybeCompleteWithCallbackRequired(event, nextEventType))
+            return;

Review Comment:
   > maybeCompleteExceptionally() checks 
NetworkClientDelegate.getAndClearMetadataError(). We can't access that from the 
application thread, so this is the first available opportunity to do so.
   
   I was thinking an atomic variable. Metadata errors are discovered in the 
background indeed, and we need to read it from the app thread on poll (and 
clear it I guess). Maybe an atomic var shared via `applicationEventHandler`, 
for instance? (`ApplicationEventHandler` lives in the `AsyncConsumer`, and has 
ref to the network component, seems like the component in-between). The network 
layer sets the atomic var, we read it on poll and throw if there is anything 
there. Would that work? If it works, we could solve the inter-thread 
communication for this case with it, instead of jumping back and forth between 
the threads. 
   
   > It's possible for there to be offset commits that finished after the last 
AsyncPollEvent completed, so maybeCompleteWithCallbackRequired() catches those 
cases before we've started any new work on the new poll.
   
   Agree that we may have commits that completed after the last 
`AsyncPollEvent`, but why do we need to go to the background to trigger them? I 
was imagining if we could just trigger commit callbacks in the app thread 
before triggering the (next) AsyncPollEvent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to