lianetm commented on code in PR #20521:
URL: https://github.com/apache/kafka/pull/20521#discussion_r2410935572


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java:
##########
@@ -1771,19 +1827,7 @@ private Fetch<K, V> pollForFetches(Timer timer) {
             return fetch;
         }
 
-        // send any new fetches (won't resend pending fetches)
-        sendFetches(timer);
-
-        // We do not want to be stuck blocking in poll if we are missing some 
positions
-        // since the offset lookup may be backing off after a failure
-
-        // NOTE: the use of cachedSubscriptionHasAllFetchPositions means we 
MUST call
-        // updateAssignmentMetadataIfNeeded before this method.
-        if (!cachedSubscriptionHasAllFetchPositions && pollTimeout > 
retryBackoffMs) {
-            pollTimeout = retryBackoffMs;
-        }

Review Comment:
   shouldn't we keep this logic here? It directly impacts how long we block 
waiting for data on the buffer in the app thread



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -726,6 +711,71 @@ private void process(final 
StreamsOnAllTasksLostCallbackCompletedEvent event) {
         
requestManagers.streamsMembershipManager.get().onAllTasksLostCallbackCompleted(event);
     }
 
+    private void process(final AsyncPollEvent event) {
+        log.trace("Processing poll logic for {}", event);
+
+        // Trigger a reconciliation that can safely commit offsets if needed 
to rebalance,
+        // as we're processing before any new fetching starts
+        
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
+            consumerMembershipManager.maybeReconcile(true));
+
+        if (requestManagers.commitRequestManager.isPresent()) {
+            CommitRequestManager commitRequestManager = 
requestManagers.commitRequestManager.get();
+            commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());
+
+            requestManagers.consumerHeartbeatRequestManager.ifPresent(hrm -> {
+                hrm.membershipManager().onConsumerPoll();
+                hrm.resetPollTimer(event.pollTimeMs());
+            });
+            requestManagers.streamsGroupHeartbeatRequestManager.ifPresent(hrm 
-> {
+                hrm.membershipManager().onConsumerPoll();
+                hrm.resetPollTimer(event.pollTimeMs());
+            });
+        }
+
+        log.trace("Processing check and update positions logic for {}", event);

Review Comment:
   if this maybe redundant given the similar log that we have at the beginning 
of the func?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to