divijvaidya commented on code in PR #12224:
URL: https://github.com/apache/kafka/pull/12224#discussion_r903622487


##########
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java:
##########
@@ -723,9 +727,35 @@ public long scheduleAtomicAppend(int epoch, 
List<ApiMessageAndVersion> batch) {
 
     @Override
     public void resign(int epoch) {
-        LeaderAndEpoch curLeader = leader;
-        LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
curLeader.epoch() + 1);
-        shared.tryAppend(nodeId, curLeader.epoch(), new 
LeaderChangeBatch(nextLeader));
+        if (epoch < 0) {
+            throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+        }
+
+        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+        int currentEpoch = leaderAndEpoch.epoch();
+
+        if (epoch > currentEpoch) {
+            throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+                    " which is larger than the current epoch " + currentEpoch);

Review Comment:
   Thanks for catching. I have fixed it as per your suggestion in the latest 
commit.



##########
metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java:
##########
@@ -723,9 +727,35 @@ public long scheduleAtomicAppend(int epoch, 
List<ApiMessageAndVersion> batch) {
 
     @Override
     public void resign(int epoch) {
-        LeaderAndEpoch curLeader = leader;
-        LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
curLeader.epoch() + 1);
-        shared.tryAppend(nodeId, curLeader.epoch(), new 
LeaderChangeBatch(nextLeader));
+        if (epoch < 0) {
+            throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+        }
+
+        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+        int currentEpoch = leaderAndEpoch.epoch();
+
+        if (epoch > currentEpoch) {
+            throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+                    " which is larger than the current epoch " + currentEpoch);
+        } else if (epoch < currentEpoch) {
+            // If the passed epoch is smaller than the current epoch, then it 
might mean
+            // that the listener has not been notified about a leader change 
that already
+            // took place. In this case, we consider the call as already 
fulfilled and
+            // take no further action.
+            log.debug("Ignoring call to resign from epoch {} since it is 
smaller than the " +
+                    "current epoch {}", epoch, currentEpoch);
+            return;
+        }
+
+        LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
currentEpoch + 1);
+        try {
+            shared.tryAppend(nodeId, currentEpoch, new 
LeaderChangeBatch(nextLeader));
+        } catch (NotLeaderException exp) {
+            // the leader epoch has already advanced. resign is a no op.
+            log.debug("Ignoring call to resign from epoch {} since it is 
smaller than the " +
+                    "current epoch {}", epoch, currentEpoch);

Review Comment:
   Thanks for catching. I have fixed it as per your suggestion in the latest 
commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to