guozhangwang commented on code in PR #12244: URL: https://github.com/apache/kafka/pull/12244#discussion_r889363536
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java: ########## @@ -548,14 +549,18 @@ public boolean poll(Timer timer, boolean waitForJoinGroup) { } } } else { - // For manually assigned partitions, if coordinator is unknown, make sure we lookup one and await metadata. + // For manually assigned partitions, we do not try to pro-actively lookup coordinator; + // instead we only try to refresh metadata when necessary. // If connections to all nodes fail, wakeups triggered while attempting to send fetch // requests result in polls returning immediately, causing a tight loop of polls. Without // the wakeup, poll() with no channels would block for the timeout, delaying re-connection. // awaitMetadataUpdate() in ensureCoordinatorReady initiates new connections with configured backoff and avoids the busy loop. - if (coordinatorUnknownAndUnready(timer)) { - return false; + if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) { + client.awaitMetadataUpdate(timer); } + + // if there is pending coordinator requests, ensure they have a chance to be transmitted. Review Comment: This is a major change while addressing @dajac 's comment: previously the manual assignment, the `coordinator.poll` call would not call `networkClient.poll`, which means that if the coordinator discovery does not complete within the `commitAsync` (note we call `networkClient.poll` twice in that function, so it's possible that function would complete the discovery), we would have no other places to poll the network client to complete the pending requests. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org