appchemist commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580499797


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
                     if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
                         partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
                             
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch())));
+                    } else {
+                        requestMetadataUpdate(metadata, subscriptions, 
partition);
+                        subscriptions.awaitUpdate(partition);

Review Comment:
   If the FetchStates is `FETCHING` as per KIP-951, the 
`FetchCollector.handleInitializeErrors()` method is called.
   I thought that in this case, it should not be changed to `AWAIT_UPDATE`.
   Additionally, if it's `AWAIT_UPDATE`, it will be filtered out by the 
following code inside the `FetchCollector.initialize()` method and will not go 
through `FetchCollector.handleInitializeErrors()`.
   
   ```
   if (!subscriptions.hasValidPosition(tp)) {
      // this can happen when a rebalance happened while fetch is still 
in-flight
      log.debug("Ignoring fetched records for partition {} since it no longer 
has valid position", tp);
      return null;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to