kirktrue commented on code in PR #15647:
URL: https://github.com/apache/kafka/pull/15647#discussion_r1580103763


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java:
##########
@@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch 
completedFetch, final E
         final long fetchOffset = completedFetch.nextFetchOffset();
 
         if (error == Errors.NOT_LEADER_OR_FOLLOWER ||
-                error == Errors.REPLICA_NOT_AVAILABLE ||
+                error == Errors.FENCED_LEADER_EPOCH) {
+            log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
+            requestMetadataUpdate(metadata, subscriptions, tp);
+        } else if (error == Errors.REPLICA_NOT_AVAILABLE ||
                 error == Errors.KAFKA_STORAGE_ERROR ||
-                error == Errors.FENCED_LEADER_EPOCH ||
                 error == Errors.OFFSET_NOT_AVAILABLE) {
             log.debug("Error in fetch for partition {}: {}", tp, 
error.exceptionName());
             requestMetadataUpdate(metadata, subscriptions, tp);
+            subscriptions.awaitUpdate(tp);

Review Comment:
   With this change, if the replica is not available, we will flag the 
partition as awaiting a metadata update. Is this a key part of this change? Why 
don't we want the first `if` block to also await an update?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java:
##########
@@ -967,7 +984,8 @@ private boolean 
maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp
                 return false;
             }
 
-            if (position != null && 
!position.currentLeader.equals(currentLeaderAndEpoch)) {
+            if (position != null &&
+                    (!position.currentLeader.equals(currentLeaderAndEpoch) || 
this.fetchState.equals(FetchStates.AWAIT_UPDATE))) {

Review Comment:
   Not sure if using the helper method shortens the line length enough to avoid 
wrapping 🤷‍♂️ 
   
   ```suggestion
               if (position != null && 
(!position.currentLeader.equals(currentLeaderAndEpoch) || awaitingUpdate())) {
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java:
##########
@@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget,
                     if (partitionData.currentLeader().leaderId() != -1 && 
partitionData.currentLeader().leaderEpoch() != -1) {
                         partitionsWithUpdatedLeaderInfo.put(partition, new 
Metadata.LeaderIdAndEpoch(
                             
Optional.of(partitionData.currentLeader().leaderId()), 
Optional.of(partitionData.currentLeader().leaderEpoch())));
+                    } else {
+                        requestMetadataUpdate(metadata, subscriptions, 
partition);
+                        subscriptions.awaitUpdate(partition);

Review Comment:
   With this change, we first request a metadata update, then flag our 
partition as awaiting the metadata update whenever we encounter a 
`NOT_LEADER_OR_FOLLOWER` or `FENCED_LEADER_EPOCH`. However, in the 
`FetchCollector.handleInitializeErrors()` method, we only only request the 
metadata update, but _don't_ flag the partition. Is that seeming inconsistency 
intentional?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to