kirktrue commented on code in PR #15647: URL: https://github.com/apache/kafka/pull/15647#discussion_r1580103763
########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/FetchCollector.java: ########## @@ -327,21 +327,27 @@ private void handleInitializeErrors(final CompletedFetch completedFetch, final E final long fetchOffset = completedFetch.nextFetchOffset(); if (error == Errors.NOT_LEADER_OR_FOLLOWER || - error == Errors.REPLICA_NOT_AVAILABLE || + error == Errors.FENCED_LEADER_EPOCH) { + log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); + requestMetadataUpdate(metadata, subscriptions, tp); + } else if (error == Errors.REPLICA_NOT_AVAILABLE || error == Errors.KAFKA_STORAGE_ERROR || - error == Errors.FENCED_LEADER_EPOCH || error == Errors.OFFSET_NOT_AVAILABLE) { log.debug("Error in fetch for partition {}: {}", tp, error.exceptionName()); requestMetadataUpdate(metadata, subscriptions, tp); + subscriptions.awaitUpdate(tp); Review Comment: With this change, if the replica is not available, we will flag the partition as awaiting a metadata update. Is this a key part of this change? Why don't we want the first `if` block to also await an update? ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java: ########## @@ -967,7 +984,8 @@ private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEp return false; } - if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) { + if (position != null && + (!position.currentLeader.equals(currentLeaderAndEpoch) || this.fetchState.equals(FetchStates.AWAIT_UPDATE))) { Review Comment: Not sure if using the helper method shortens the line length enough to avoid wrapping 🤷♂️ ```suggestion if (position != null && (!position.currentLeader.equals(currentLeaderAndEpoch) || awaitingUpdate())) { ``` ########## clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java: ########## @@ -200,6 +200,9 @@ protected void handleFetchSuccess(final Node fetchTarget, if (partitionData.currentLeader().leaderId() != -1 && partitionData.currentLeader().leaderEpoch() != -1) { partitionsWithUpdatedLeaderInfo.put(partition, new Metadata.LeaderIdAndEpoch( Optional.of(partitionData.currentLeader().leaderId()), Optional.of(partitionData.currentLeader().leaderEpoch()))); + } else { + requestMetadataUpdate(metadata, subscriptions, partition); + subscriptions.awaitUpdate(partition); Review Comment: With this change, we first request a metadata update, then flag our partition as awaiting the metadata update whenever we encounter a `NOT_LEADER_OR_FOLLOWER` or `FENCED_LEADER_EPOCH`. However, in the `FetchCollector.handleInitializeErrors()` method, we only only request the metadata update, but _don't_ flag the partition. Is that seeming inconsistency intentional? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org