mumrah commented on a change in pull request #8376: URL: https://github.com/apache/kafka/pull/8376#discussion_r429323395
########## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java ########## @@ -422,8 +427,29 @@ public synchronized void position(TopicPartition tp, FetchPosition position) { assignedState(tp).position(position); } - public synchronized boolean maybeValidatePositionForCurrentLeader(TopicPartition tp, Metadata.LeaderAndEpoch leaderAndEpoch) { - return assignedState(tp).maybeValidatePosition(leaderAndEpoch); + /** + * Enter the offset validation state if the leader for this partition is known to support a usable version of the + * OffsetsForLeaderEpoch API. If the leader node does not support the API, simply complete the offset validation. + * + * @param apiVersions + * @param tp + * @param leaderAndEpoch + * @return true if we enter the offset validation state + */ + public synchronized boolean maybeValidatePositionForCurrentLeader(ApiVersions apiVersions, TopicPartition tp, + Metadata.LeaderAndEpoch leaderAndEpoch) { + if (leaderAndEpoch.leader.isPresent()) { + NodeApiVersions nodeApiVersions = apiVersions.get(leaderAndEpoch.leader.get().idString()); + if (nodeApiVersions == null || hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) { + return assignedState(tp).maybeValidatePosition(leaderAndEpoch); + } else { + // If the broker does not support a newer version of OffsetsForLeaderEpoch, we skip validation + completeValidation(tp); + return false; + } + } else { + return assignedState(tp).maybeValidatePosition(leaderAndEpoch); Review comment: I wonder, do we really need this call here? If the leader is not present the epoch shouldn't be present either -- right? If that's the case, then the call to maybeValidatePosition will short circuit ```java private boolean maybeValidatePosition(Metadata.LeaderAndEpoch currentLeaderAndEpoch) { if (this.fetchState.equals(FetchStates.AWAIT_RESET)) { return false; } if (!currentLeaderAndEpoch.leader.isPresent() && !currentLeaderAndEpoch.epoch.isPresent()) { return false; } if (position != null && !position.currentLeader.equals(currentLeaderAndEpoch)) { FetchPosition newPosition = new FetchPosition(position.offset, position.offsetEpoch, currentLeaderAndEpoch); validatePosition(newPosition); preferredReadReplica = null; } return this.fetchState.equals(FetchStates.AWAIT_VALIDATION); } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org