appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
                     if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
                         partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
                             
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch())));
+                    } else {
+                        requestMetadataUpdate(metadata, subscriptions, 
partition);
+                        subscriptions.awaitUpdate(partition);

Review Comment:
   If the FetchStates is FETCHING as per KIP-951, the 
FetchCollector.handleInitializeErrors() method is called.
   I thought that in this case, it should not be changed to AWAIT_UPDATE.
   Additionally, if it's AWAIT_UPDATE, it will be filtered out by the following 
code inside the FetchCollector.initialize() method and will not go through 
FetchCollector.handleInitializeErrors().
   
   ```
   if (!subscriptions.hasValidPosition(tp)) {
      // this can happen when a rebalance happened while fetch is still 
in-flight
      log.debug("Ignoring fetched records for partition {} since it no longer 
has valid position", tp);
      return null;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to