guozhangwang commented on code in PR #12244:
URL: https://github.com/apache/kafka/pull/12244#discussion_r889363536


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java:
##########
@@ -548,14 +549,18 @@ public boolean poll(Timer timer, boolean 
waitForJoinGroup) {
                 }
             }
         } else {
-            // For manually assigned partitions, if coordinator is unknown, 
make sure we lookup one and await metadata.
+            // For manually assigned partitions, we do not try to pro-actively 
lookup coordinator;
+            // instead we only try to refresh metadata when necessary.
             // If connections to all nodes fail, wakeups triggered while 
attempting to send fetch
             // requests result in polls returning immediately, causing a tight 
loop of polls. Without
             // the wakeup, poll() with no channels would block for the 
timeout, delaying re-connection.
             // awaitMetadataUpdate() in ensureCoordinatorReady initiates new 
connections with configured backoff and avoids the busy loop.
-            if (coordinatorUnknownAndUnready(timer)) {
-                return false;
+            if (metadata.updateRequested() && 
!client.hasReadyNodes(timer.currentTimeMs())) {
+                client.awaitMetadataUpdate(timer);
             }
+
+            // if there is pending coordinator requests, ensure they have a 
chance to be transmitted.

Review Comment:
   This is a major change while addressing @dajac 's comment: previously the 
manual assignment, the `coordinator.poll` call would not call 
`networkClient.poll`, which means that if the coordinator discovery does not 
complete within the `commitAsync` (note we call `networkClient.poll` twice in 
that function, so it's possible that function would complete the discovery), we 
would have no other places to poll the network client to complete the pending 
requests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to