This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 5e4c8f704c2 KAFKA-13943; Make `LocalLogManager` implementation 
consistent with the `RaftClient` contract (#12224)
5e4c8f704c2 is described below

commit 5e4c8f704c28d9f60a49a2bf2fb4ea8b7f7e231d
Author: Divij Vaidya <[email protected]>
AuthorDate: Wed Jul 6 05:08:28 2022 +0200

    KAFKA-13943; Make `LocalLogManager` implementation consistent with the 
`RaftClient` contract (#12224)
    
    Fixes two issues in the implementation of `LocalLogManager`:
    
    - As per the interface contract for `RaftClient.scheduleAtomicAppend()`, it 
should throw a `NotLeaderException` exception when the provided current leader 
epoch does not match the current epoch. However, the current 
`LocalLogManager`'s implementation of the API returns a LONG_MAX instead of 
throwing an exception. This change fixes the behaviour and makes it consistent 
with the interface contract.
    -  As per the interface contract for `RaftClient.resign(epoch)`if the 
parameter epoch does not match the current epoch, this call will be ignored. 
But in the current `LocalLogManager` implementation the leader epoch might 
change when the thread is waiting to acquire a lock on `shared.tryAppend()` 
(note that tryAppend() is a synchronized method). In such a case, if a 
NotALeaderException is thrown (as per code change in above), then resign should 
be ignored.
    
    Reviewers: José Armando García Sancio <[email protected]>, 
Tom Bentley <[email protected]>, Jason Gustafson <[email protected]>
---
 .../org/apache/kafka/metalog/LocalLogManager.java  | 53 +++++++++++++++++-----
 .../java/org/apache/kafka/raft/RaftClient.java     |  5 +-
 .../kafka/raft/internals/BatchAccumulator.java     |  9 ++--
 3 files changed, 52 insertions(+), 15 deletions(-)

diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 36fe3ec2c1c..c8e39ae3289 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -31,6 +31,7 @@ import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.raft.errors.NotLeaderException;
 import org.apache.kafka.raft.internals.MemoryBatchReader;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.snapshot.MockRawSnapshotReader;
@@ -226,16 +227,20 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         }
 
         synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) {
-            if (epoch != leader.epoch()) {
-                log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch 
does not " +
-                    "match the current leader epoch of {}.", nodeId, epoch, 
leader.epoch());
-                return Long.MAX_VALUE;
-            }
             if (!leader.isLeader(nodeId)) {
-                log.trace("tryAppend(nodeId={}, epoch={}): the given node id 
does not " +
-                    "match the current leader id of {}.", nodeId, epoch, 
leader.leaderId());
-                return Long.MAX_VALUE;
+                log.debug("tryAppend(nodeId={}, epoch={}): the given node id 
does not " +
+                        "match the current leader id of {}.", nodeId, epoch, 
leader.leaderId());
+                throw new NotLeaderException("Append failed because the 
replication is not the current leader");
+            }
+
+            if (epoch < leader.epoch()) {
+                throw new NotLeaderException("Append failed because the given 
epoch " + epoch + " is stale. " +
+                        "Current leader epoch = " + leader.epoch());
+            } else if (epoch > leader.epoch()) {
+                throw new IllegalArgumentException("Attempt to append from 
epoch " + epoch +
+                        " which is larger than the current epoch " + 
leader.epoch());
             }
+
             log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
             long offset = append(batch);
             electLeaderIfNeeded();
@@ -723,9 +728,35 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
 
     @Override
     public void resign(int epoch) {
-        LeaderAndEpoch curLeader = leader;
-        LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
curLeader.epoch() + 1);
-        shared.tryAppend(nodeId, curLeader.epoch(), new 
LeaderChangeBatch(nextLeader));
+        if (epoch < 0) {
+            throw new IllegalArgumentException("Attempt to resign from an 
invalid negative epoch " + epoch);
+        }
+
+        LeaderAndEpoch leaderAndEpoch = leaderAndEpoch();
+        int currentEpoch = leaderAndEpoch.epoch();
+
+        if (epoch > currentEpoch) {
+            throw new IllegalArgumentException("Attempt to resign from epoch " 
+ epoch +
+                    " which is larger than the current epoch " + currentEpoch);
+        } else if (epoch < currentEpoch) {
+            // If the passed epoch is smaller than the current epoch, then it 
might mean
+            // that the listener has not been notified about a leader change 
that already
+            // took place. In this case, we consider the call as already 
fulfilled and
+            // take no further action.
+            log.debug("Ignoring call to resign from epoch {} since it is 
smaller than the " +
+                    "current epoch {}", epoch, currentEpoch);
+            return;
+        }
+
+        LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
currentEpoch + 1);
+        try {
+            shared.tryAppend(nodeId, currentEpoch, new 
LeaderChangeBatch(nextLeader));
+        } catch (NotLeaderException exp) {
+            // the leader epoch has already advanced. resign is a no op.
+            log.debug("Ignoring call to resign from epoch {}. Either we are 
not the leader or the provided epoch is " +
+                    "smaller than the current epoch {}", epoch, currentEpoch);
+            return;
+        }
     }
 
     @Override
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 952cc60a387..51f859c6c07 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -205,8 +205,11 @@ public interface RaftClient<T> extends AutoCloseable {
      * Notification of successful resignation can be observed through
      * {@link Listener#handleLeaderChange(LeaderAndEpoch)}.
      *
-     * @param epoch the epoch to resign from. If this does not match the 
current epoch, this
+     * @param epoch the epoch to resign from. If this epoch is smaller than 
the current epoch, this
      *              call will be ignored.
+     *
+     * @throws IllegalArgumentException - if the passed epoch is invalid 
(negative or greater than current) or
+     * if the listener is not the leader associated with this epoch.
      */
     void resign(int epoch);
 
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 77efd2f7e15..323f393aebe 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -104,7 +104,8 @@ public class BatchAccumulator<T> implements Closeable {
      * @throws RecordBatchTooLargeException if the size of one record T is 
greater than the maximum
      *         batch size; if this exception is throw some of the elements in 
records may have
      *         been committed
-     * @throws NotLeaderException if the epoch doesn't match the leader epoch
+     * @throws NotLeaderException if the epoch is less than the leader epoch
+     * @throws IllegalArgumentException if the epoch is invalid (greater than 
the leader epoch)
      * @throws BufferAllocationException if we failed to allocate memory for 
the records
      */
     public long append(int epoch, List<T> records) {
@@ -123,7 +124,8 @@ public class BatchAccumulator<T> implements Closeable {
      * @throws RecordBatchTooLargeException if the size of the records is 
greater than the maximum
      *         batch size; if this exception is throw none of the elements in 
records were
      *         committed
-     * @throws NotLeaderException if the epoch doesn't match the leader epoch
+     * @throws NotLeaderException if the epoch is less than the leader epoch
+     * @throws IllegalArgumentException if the epoch is invalid (greater than 
the leader epoch)
      * @throws BufferAllocationException if we failed to allocate memory for 
the records
      */
     public long appendAtomic(int epoch, List<T> records) {
@@ -132,7 +134,8 @@ public class BatchAccumulator<T> implements Closeable {
 
     private long append(int epoch, List<T> records, boolean isAtomic) {
         if (epoch < this.epoch) {
-            throw new NotLeaderException("Append failed because the epoch 
doesn't match");
+            throw new NotLeaderException("Append failed because the given 
epoch " + epoch + " is stale. " +
+                    "Current leader epoch = " + this.epoch());
         } else if (epoch > this.epoch) {
             throw new IllegalArgumentException("Attempt to append from epoch " 
+ epoch +
                 " which is larger than the current epoch " + this.epoch);

Reply via email to