tombentley commented on code in PR #12224: URL: https://github.com/apache/kafka/pull/12224#discussion_r901413660
########## metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java: ########## @@ -226,16 +227,19 @@ synchronized long tryAppend(int nodeId, int epoch, List<ApiMessageAndVersion> ba } synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) { - if (epoch != leader.epoch()) { - log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " + - "match the current leader epoch of {}.", nodeId, epoch, leader.epoch()); - return Long.MAX_VALUE; - } if (!leader.isLeader(nodeId)) { - log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " + - "match the current leader id of {}.", nodeId, epoch, leader.leaderId()); - return Long.MAX_VALUE; + log.debug("tryAppend(nodeId={}, epoch={}): the given node id does not " + + "match the current leader id of {}.", nodeId, epoch, leader.leaderId()); + throw new NotLeaderException("Append failed because the replication is not the current leader"); + } + + if (epoch < leader.epoch()) { + throw new NotLeaderException("Append failed because the epoch doesn't match"); Review Comment: 'doesn't match' => `!=`, which isn't what the check's actually doing. Perhaps something like "... because the given epoch is stale" would be clearer. ########## metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java: ########## @@ -723,9 +727,35 @@ public long scheduleAtomicAppend(int epoch, List<ApiMessageAndVersion> batch) { @Override public void resign(int epoch) { - LeaderAndEpoch curLeader = leader; - LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), curLeader.epoch() + 1); - shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader)); + if (epoch < 0) { + throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); + } + + LeaderAndEpoch leaderAndEpoch = leaderAndEpoch(); + int currentEpoch = leaderAndEpoch.epoch(); + + if (epoch > currentEpoch) { + throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + + " which is larger than the current epoch " + currentEpoch); Review Comment: The contract in `RaftClient` still says ```java * @param epoch the epoch to resign from. If this does not match the current epoch, this * call will be ignored. ``` Of course `KafkaRaftClient` is doing the same check, so I guess we need to update the `RaftClient` javadoc. ########## metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java: ########## @@ -723,9 +727,35 @@ public long scheduleAtomicAppend(int epoch, List<ApiMessageAndVersion> batch) { @Override public void resign(int epoch) { - LeaderAndEpoch curLeader = leader; - LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), curLeader.epoch() + 1); - shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader)); + if (epoch < 0) { + throw new IllegalArgumentException("Attempt to resign from an invalid negative epoch " + epoch); + } + + LeaderAndEpoch leaderAndEpoch = leaderAndEpoch(); + int currentEpoch = leaderAndEpoch.epoch(); + + if (epoch > currentEpoch) { + throw new IllegalArgumentException("Attempt to resign from epoch " + epoch + + " which is larger than the current epoch " + currentEpoch); + } else if (epoch < currentEpoch) { + // If the passed epoch is smaller than the current epoch, then it might mean + // that the listener has not been notified about a leader change that already + // took place. In this case, we consider the call as already fulfilled and + // take no further action. + log.debug("Ignoring call to resign from epoch {} since it is smaller than the " + + "current epoch {}", epoch, currentEpoch); + return; + } + + LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), currentEpoch + 1); + try { + shared.tryAppend(nodeId, currentEpoch, new LeaderChangeBatch(nextLeader)); + } catch (NotLeaderException exp) { + // the leader epoch has already advanced. resign is a no op. + log.debug("Ignoring call to resign from epoch {} since it is smaller than the " + + "current epoch {}", epoch, currentEpoch); Review Comment: `tryAppend` can throw `NotLeaderException` when `nodeId` is not the current leader too, so the log message could be confusing. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org