appchemist commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch()))); + } else { + requestMetadataUpdate(metadata, subscriptions, partition); + subscriptions.awaitUpdate(partition); Review Comment: If the FetchStates is `FETCHING` as per KIP-951, the `FetchCollector.handleInitializeErrors()` method is called. I thought that in this case, it should not be changed to `AWAIT_UPDATE`. Additionally, if it's `AWAIT_UPDATE`, it will be filtered out by the following code inside the `FetchCollector.initialize()` method and will not go through `FetchCollector.handleInitializeErrors()`. ``` if (!subscriptions.hasValidPosition(tp)) { // this can happen when a rebalance happened while fetch is still in-flight log.debug("Ignoring fetched records for partition {} since it no longer has valid position", tp); return null; } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org