This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch txn1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8a9e64100526ee15507d535e0fefd091f49fa60e
Author: Colin P. McCabe <cmcc...@apache.org>
AuthorDate: Tue Apr 25 16:09:35 2023 -0700

    MINOR: provide the exact offset to QuorumController.replay
    
    Provide the exact record offset to QuorumController.replay() in all cases. 
There are several situations
    where this is useful, such as logging, implementing metadata transactions, 
or handling broker
    registration records.
    
    In the case where the QC is inactive, and simply replaying records, it is 
easy to compute the exact
    record offset from the batch base offset and the record index.
    
    The active QC case is more difficult. Technically, when we submit records 
to the Raft layer, it can
    choose a batch end offset later than the one we expect, if someone else is 
also adding records.
    While the QC is the only entity submitting data records, control records 
may be added at any time.
    In the current implementation, these are really only used for leadership 
elections. However, this
    could change with the addition of quorum reconfiguration or similar 
features.
    
    Therefore, this PR allows the QC to tell the Raft layer that a record 
append should fail if it
    would have resulted in a batch end offset other than what was expected. 
This in turn will trigger a
    controller failover. In the future, if automatically added control records 
become more common, we
    may wish to have a more sophisticated system than this simple optimistic 
concurrency mechanism. But
    for now, this will allow us to rely on the offset as correct.
    
    In order that the active QC can learn what offset to start writing at, the 
PR also adds a new
    endOffset parameter to handleLeaderChange. Since the Raft layer only 
invokes handleLeaderChange on
    the active once it has replayed the log, this information should always be 
up-to-date in that
    context.
    
    At the Raft level, this PR adds a new exception, 
UnexpectedEndOffsetException. This gets thrown
    when we request an end offset that doesn't match the one the Raft layer 
would have given us.
    Although this exception should cause a failover, it should not be 
considered a fault. This
    complicated the exception handling a bit and motivated splitting more of it 
out into the new
    EventHandlerExceptionInfo class. This will also let us unit test things 
like slf4j log messages a
    bit better.
---
 .../main/scala/kafka/tools/TestRaftServer.scala    |   2 +-
 .../apache/kafka/controller/QuorumController.java  | 100 +++++----
 .../controller/errors/ControllerExceptions.java    |  61 ------
 .../errors/EventHandlerExceptionInfo.java          | 223 +++++++++++++++++++++
 .../apache/kafka/image/loader/MetadataLoader.java  |   2 +-
 .../kafka/metadata/util/BatchFileWriter.java       |   5 +-
 .../kafka/metadata/util/SnapshotFileReader.java    |   2 +-
 .../errors/ControllerExceptionsTest.java           |  68 +------
 .../errors/EventHandlerExceptionInfoTest.java      | 173 ++++++++++++++++
 .../kafka/image/publisher/SnapshotEmitterTest.java |   6 +-
 .../org/apache/kafka/metalog/LocalLogManager.java  |  76 +++++--
 .../kafka/metalog/LocalLogManagerTestEnv.java      |  10 +-
 .../kafka/metalog/MockMetaLogManagerListener.java  |   2 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  27 +--
 .../java/org/apache/kafka/raft/RaftClient.java     |  14 +-
 .../org/apache/kafka/raft/ReplicatedCounter.java   |   2 +-
 .../raft/errors/UnexpectedEndOffsetException.java  |  29 +++
 .../kafka/raft/internals/BatchAccumulator.java     |  69 ++-----
 .../kafka/snapshot/RecordsSnapshotWriter.java      |   3 +-
 .../org/apache/kafka/raft/KafkaRaftClientTest.java |   3 +-
 .../apache/kafka/raft/RaftClientTestContext.java   |   2 +-
 .../kafka/raft/internals/BatchAccumulatorTest.java |  17 +-
 .../kafka/deferred/DeferredEventQueueTest.java     |  26 +++
 23 files changed, 632 insertions(+), 290 deletions(-)

diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 1026c528473..6d2653979ad 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -173,7 +173,7 @@ class TestRaftServer(
 
     raftManager.register(this)
 
-    override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch): Unit = 
{
+    override def handleLeaderChange(newLeaderAndEpoch: LeaderAndEpoch, 
endOffset: Long): Unit = {
       if (newLeaderAndEpoch.isLeader(config.nodeId)) {
         eventQueue.offer(HandleClaim(newLeaderAndEpoch.epoch))
       } else if (claimedEpoch.isDefined) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 582d774e6e9..3026154fb4f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -77,6 +77,7 @@ import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.controller.errors.ControllerExceptions;
+import org.apache.kafka.controller.errors.EventHandlerExceptionInfo;
 import org.apache.kafka.controller.metrics.QuorumControllerMetrics;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
@@ -458,38 +459,32 @@ public final class QuorumController implements Controller 
{
         
controllerMetrics.updateEventQueueProcessingTime(NANOSECONDS.toMillis(deltaNs));
     }
 
-    private Throwable handleEventException(String name,
-                                           OptionalLong startProcessingTimeNs,
-                                           Throwable exception) {
-        Throwable externalException =
-                ControllerExceptions.toExternalException(exception, () -> 
latestController());
-        if (!startProcessingTimeNs.isPresent()) {
-            log.error("{}: unable to start processing because of {}. Reason: 
{}", name,
-                exception.getClass().getSimpleName(), exception.getMessage());
-            return externalException;
-        }
-        long endProcessingTime = time.nanoseconds();
-        long deltaNs = endProcessingTime - startProcessingTimeNs.getAsLong();
-        long deltaUs = MICROSECONDS.convert(deltaNs, NANOSECONDS);
-        if (ControllerExceptions.isExpected(exception)) {
-            log.info("{}: failed with {} in {} us. Reason: {}", name,
-                exception.getClass().getSimpleName(), deltaUs, 
exception.getMessage());
-            return externalException;
+    private Throwable handleEventException(
+        String name,
+        OptionalLong startProcessingTimeNs,
+        Throwable exception
+    ) {
+        OptionalLong deltaUs;
+        if (startProcessingTimeNs.isPresent()) {
+            long endProcessingTime = time.nanoseconds();
+            long deltaNs = endProcessingTime - 
startProcessingTimeNs.getAsLong();
+            deltaUs = OptionalLong.of(MICROSECONDS.convert(deltaNs, 
NANOSECONDS));
+        } else {
+            deltaUs = OptionalLong.empty();
+        }
+        EventHandlerExceptionInfo info = EventHandlerExceptionInfo.
+                fromInternal(exception, () -> latestController());
+        String failureMessage = info.failureMessage(lastCommittedEpoch, 
deltaUs,
+                isActiveController(), lastCommittedOffset);
+        if (info.isFault()) {
+            nonFatalFaultHandler.handleFault(name + ": " + failureMessage, 
exception);
+        } else {
+            log.info("{}: {}", name, failureMessage);
         }
-        if (isActiveController()) {
-            nonFatalFaultHandler.handleFault(String.format("%s: failed with 
unexpected server " +
-                    "exception %s at epoch %d in %d us. Renouncing leadership 
and reverting " +
-                    "to the last committed offset %d.",
-                    name, exception.getClass().getSimpleName(), curClaimEpoch, 
deltaUs,
-                    lastCommittedOffset), exception);
+        if (info.causesFailover() && isActiveController()) {
             renounce();
-        } else {
-            nonFatalFaultHandler.handleFault(String.format("%s: failed with 
unexpected server " +
-                    "exception %s in %d us. The controller is already in 
standby mode.",
-                    name, exception.getClass().getSimpleName(), deltaUs),
-                    exception);
         }
-        return externalException;
+        return info.effectiveExternalException();
     }
 
     /**
@@ -740,22 +735,24 @@ public final class QuorumController implements Controller 
{
                             // Start by trying to apply the record to our 
in-memory state. This should always
                             // succeed; if it does not, that's a fatal error. 
It is important to do this before
                             // scheduling the record for Raft replication.
-                            int i = 1;
+                            int i = 0;
                             for (ApiMessageAndVersion message : records) {
                                 try {
-                                    replay(message.message(), 
Optional.empty(), prevEndOffset + records.size());
+                                    replay(message.message(), 
Optional.empty(), prevEndOffset + i + 1);
                                 } catch (Throwable e) {
                                     String failureMessage = 
String.format("Unable to apply %s record, which was " +
                                             "%d of %d record(s) in the batch 
following last write offset %d.",
-                                            
message.message().getClass().getSimpleName(), i, records.size(),
+                                            
message.message().getClass().getSimpleName(), i + 1, records.size(),
                                             prevEndOffset);
                                     throw 
fatalFaultHandler.handleFault(failureMessage, e);
                                 }
                                 i++;
                             }
-                            prevEndOffset = 
raftClient.scheduleAtomicAppend(controllerEpoch, records);
-                            
snapshotRegistry.getOrCreateSnapshot(prevEndOffset);
-                            return prevEndOffset;
+                            long nextEndOffset = prevEndOffset + i;
+                            raftClient.scheduleAtomicAppend(controllerEpoch, 
OptionalLong.of(nextEndOffset), records);
+                            
snapshotRegistry.getOrCreateSnapshot(nextEndOffset);
+                            prevEndOffset = nextEndOffset;
+                            return nextEndOffset;
                         }
                     });
                 op.processBatchEndOffset(offset);
@@ -973,14 +970,14 @@ public final class QuorumController implements Controller 
{
                                 log.debug("Replaying commits from the active 
node up to " +
                                     "offset {} and epoch {}.", offset, epoch);
                             }
-                            int i = 1;
+                            int i = 0;
                             for (ApiMessageAndVersion message : messages) {
                                 try {
-                                    replay(message.message(), 
Optional.empty(), offset);
+                                    replay(message.message(), 
Optional.empty(), batch.baseOffset() + i);
                                 } catch (Throwable e) {
                                     String failureMessage = 
String.format("Unable to apply %s record on standby " +
                                             "controller, which was %d of %d 
record(s) in the batch with baseOffset %d.",
-                                            
message.message().getClass().getSimpleName(), i, messages.size(),
+                                            
message.message().getClass().getSimpleName(), i + 1, messages.size(),
                                             batch.baseOffset());
                                     throw 
fatalFaultHandler.handleFault(failureMessage, e);
                                 }
@@ -1059,7 +1056,7 @@ public final class QuorumController implements Controller 
{
         }
 
         @Override
-        public void handleLeaderChange(LeaderAndEpoch newLeader) {
+        public void handleLeaderChange(LeaderAndEpoch newLeader, long 
endOffset) {
             appendRaftEvent("handleLeaderChange[" + newLeader.epoch() + "]", 
() -> {
                 final String newLeaderName = newLeader.leaderId().isPresent() ?
                         String.valueOf(newLeader.leaderId().getAsInt()) : 
"(none)";
@@ -1076,10 +1073,10 @@ public final class QuorumController implements 
Controller {
                         renounce();
                     }
                 } else if (newLeader.isLeader(nodeId)) {
-                    log.info("Becoming the active controller at epoch {}, 
committed offset {}, " +
-                        "committed epoch {}", newLeader.epoch(), 
lastCommittedOffset,
-                        lastCommittedEpoch);
-                    claim(newLeader.epoch());
+                    long newLastWriteOffset = endOffset - 1;
+                    log.info("Becoming the active controller at epoch {}, last 
write offset {}.",
+                        newLeader.epoch(), newLastWriteOffset);
+                    claim(newLeader.epoch(), newLastWriteOffset);
                 } else {
                     log.info("In the new epoch {}, the leader is {}.",
                         newLeader.epoch(), newLeaderName);
@@ -1150,15 +1147,17 @@ public final class QuorumController implements 
Controller {
         }
     }
 
-    private void claim(int epoch) {
+    private void claim(int epoch, long newLastWriteOffset) {
         try {
             if (curClaimEpoch != -1) {
                 throw new RuntimeException("Cannot claim leadership because we 
are already the " +
                         "active controller.");
             }
             curClaimEpoch = epoch;
+            lastCommittedOffset = newLastWriteOffset;
+            lastCommittedEpoch = epoch;
             controllerMetrics.setActive(true);
-            updateWriteOffset(lastCommittedOffset);
+            updateWriteOffset(newLastWriteOffset);
             clusterControl.activate();
 
             // Before switching to active, create an in-memory snapshot at the 
last committed
@@ -1498,25 +1497,24 @@ public final class QuorumController implements 
Controller {
      *
      * @param message           The metadata record
      * @param snapshotId        The snapshotId if this record is from a 
snapshot
-     * @param batchLastOffset   The offset of the last record in the log 
batch, or the lastContainedLogOffset
-     *                          if this record is from a snapshot, this is 
used along with RegisterBrokerRecord
+     * @param offset            The offset of the record
      */
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId, long batchLastOffset) {
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> 
snapshotId, long offset) {
         if (log.isTraceEnabled()) {
             if (snapshotId.isPresent()) {
                 log.trace("Replaying snapshot {} record {}",
                     Snapshots.filenameFromSnapshotId(snapshotId.get()),
                         recordRedactor.toLoggableString(message));
             } else {
-                log.trace("Replaying log record {} with batchLastOffset {}",
-                        recordRedactor.toLoggableString(message), 
batchLastOffset);
+                log.trace("Replaying log record {} with offset {}",
+                        recordRedactor.toLoggableString(message), offset);
             }
         }
         logReplayTracker.replay(message);
         MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
         switch (type) {
             case REGISTER_BROKER_RECORD:
-                clusterControl.replay((RegisterBrokerRecord) message, 
batchLastOffset);
+                clusterControl.replay((RegisterBrokerRecord) message, offset);
                 break;
             case UNREGISTER_BROKER_RECORD:
                 clusterControl.replay((UnregisterBrokerRecord) message);
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
 
b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
index 6f99ea45f6b..b7e74446a4b 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/errors/ControllerExceptions.java
@@ -17,18 +17,11 @@
 
 package org.apache.kafka.controller.errors;
 
-import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.NotControllerException;
-import org.apache.kafka.common.errors.PolicyViolationException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.raft.errors.NotLeaderException;
-import org.apache.kafka.server.mutable.BoundedListTooLongException;
 
 import java.util.OptionalInt;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.function.Supplier;
 
 
 public class ControllerExceptions {
@@ -93,58 +86,4 @@ public class ControllerExceptions {
             return new NotControllerException("No controller appears to be 
active.");
         }
     }
-
-    /**
-     * Determine if an exception is expected. Unexpected exceptions trigger 
controller failovers
-     * when they are raised.
-     *
-     * @param exception     The exception.
-     * @return              True if the exception is expected.
-     */
-    public static boolean isExpected(Throwable exception) {
-        if (exception instanceof ApiException) {
-            // ApiExceptions indicate errors that should be returned to the 
user.
-            return true;
-        } else if (exception instanceof NotLeaderException) {
-            // NotLeaderException is thrown if we try to append records, but 
are not the leader.
-            return true;
-        } else if (exception instanceof RejectedExecutionException) {
-            // This can happen when the controller is shutting down.
-            return true;
-        } else if (exception instanceof BoundedListTooLongException) {
-            // This can happen if we tried to create too many records.
-            return true;
-        } else if (exception instanceof InterruptedException) {
-            // Interrupted exceptions are not expected. They might happen 
during junit tests if
-            // the test gets stuck and must be terminated by sending IE to all 
the threads.
-            return false;
-        }
-        // Other exceptions are unexpected.
-        return false;
-    }
-
-    /**
-     * Translate an internal controller exception to its external equivalent.
-     *
-     * @param exception     The internal exception.
-     * @return              Its external equivalent.
-     */
-    public static Throwable toExternalException(
-        Throwable exception,
-        Supplier<OptionalInt> latestControllerSupplier
-    ) {
-        if (exception instanceof ApiException) {
-            return exception;
-        } else if (exception instanceof NotLeaderException) {
-            return newWrongControllerException(latestControllerSupplier.get());
-        } else if (exception instanceof RejectedExecutionException) {
-            return new TimeoutException("The controller is shutting down.", 
exception);
-        } else if (exception instanceof BoundedListTooLongException) {
-            return new PolicyViolationException("Unable to perform excessively 
large batch " +
-                    "operation.");
-        } else if (exception instanceof InterruptedException) {
-            return new UnknownServerException("The controller was 
interrupted.");
-        }
-        return new UnknownServerException(exception);
-    }
 }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
 
b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
new file mode 100644
index 00000000000..4e74b71a677
--- /dev/null
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfo.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.ApiException;
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.PolicyViolationException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.apache.kafka.server.mutable.BoundedListTooLongException;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.function.Supplier;
+
+
+public final class EventHandlerExceptionInfo {
+    /**
+     * True if this exception should be treated as a fault.
+     */
+    private final boolean isFault;
+
+    /**
+     * True if this exception should cause a controller failover.
+     * All faults cause failover
+     */
+    private final boolean causesFailover;
+
+    /**
+     * The internal exception.
+     */
+    private final Throwable internalException;
+
+    /**
+     * The exception to present to RPC callers, or Optional.empty if the 
internal exception should
+     * be presented directly.
+     */
+    private final Optional<Throwable> externalException;
+
+    /**
+     * Create an EventHandlerExceptionInfo object from an internal exception.
+     *
+     * @param internal                  The internal exception.
+     * @param latestControllerSupplier  A function we can call to obtain the 
latest leader id.
+     *
+     * @return                          The new immutable info object.
+     */
+    public static EventHandlerExceptionInfo fromInternal(
+        Throwable internal,
+        Supplier<OptionalInt> latestControllerSupplier
+    ) {
+        if (internal instanceof ApiException) {
+            // This exception is a standard API error response from the 
controller, which can pass
+            // through without modification.
+            return new EventHandlerExceptionInfo(false, false, internal);
+        } else if (internal instanceof NotLeaderException) {
+            // The controller has lost leadership.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                
ControllerExceptions.newWrongControllerException(latestControllerSupplier.get()));
+        } else if (internal instanceof RejectedExecutionException) {
+            // The controller event queue is shutting down.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new TimeoutException("The controller is shutting down.", 
internal));
+        } else if (internal instanceof BoundedListTooLongException) {
+            // The operation could not be performed because it would have 
created an overly large
+            // batch.
+            return new EventHandlerExceptionInfo(false, false, internal,
+                new PolicyViolationException("Unable to perform excessively 
large batch " +
+                    "operation."));
+        } else if (internal instanceof UnexpectedEndOffsetException) {
+            // The active controller picked the wrong end offset for its next 
batch. It must now
+            // fail over. This should be pretty rare.
+            return new EventHandlerExceptionInfo(false, true, internal,
+                new NotControllerException("Unexpected end offset. Controller 
not known."));
+        } else if (internal instanceof InterruptedException) {
+            // The controller event queue has been interrupted. This normally 
only happens during
+            // a JUnit test that has hung. The test framework sometimes sends 
an InterruptException
+            // to all threads to try to get them to shut down. This isn't the 
correct way to shut
+            // the test, but it may happen if something hung.
+            return new EventHandlerExceptionInfo(true, true, internal,
+                new UnknownServerException("The controller was interrupted."));
+        } else {
+            // This is the catch-all case for things that aren't supposed to 
happen. Null pointer
+            // exceptions, illegal argument exceptions, etc. They get 
translated into an
+            // UnknownServerException and a controller failover.
+            return new EventHandlerExceptionInfo(true, true, internal,
+                new UnknownServerException(internal));
+        }
+    }
+
+    /**
+     * Returns true if the class and message fields match for two exceptions. 
Handles nulls.
+     */
+    static boolean exceptionClassesAndMessagesMatch(Throwable a, Throwable b) {
+        if (a == null) return b == null;
+        if (b == null) return false;
+        if (!a.getClass().equals(b.getClass())) return false;
+        if (!Objects.equals(a.getMessage(), b.getMessage())) return false;
+        return true;
+    }
+
+    EventHandlerExceptionInfo(
+        boolean isFault,
+        boolean causesFailover,
+        Throwable internalException
+    ) {
+        this.isFault = isFault;
+        this.causesFailover = causesFailover;
+        this.internalException = internalException;
+        this.externalException = Optional.empty();
+    }
+
+    EventHandlerExceptionInfo(
+        boolean isFault,
+        boolean causesFailover,
+        Throwable internalException,
+        Throwable externalException
+    ) {
+        this.isFault = isFault;
+        this.causesFailover = causesFailover;
+        this.internalException = internalException;
+        this.externalException = Optional.of(externalException);
+    }
+
+    public boolean isFault() {
+        return isFault;
+    }
+
+    public boolean causesFailover() {
+        return causesFailover;
+    }
+
+    public Throwable effectiveExternalException() {
+        return externalException.orElse(internalException);
+    }
+
+    public String failureMessage(
+        int epoch,
+        OptionalLong deltaUs,
+        boolean isActiveController,
+        long lastCommittedOffset
+    ) {
+        StringBuilder bld = new StringBuilder();
+        if (deltaUs.isPresent()) {
+            bld.append("failed with ");
+        } else {
+            bld.append("unable to start processing because of ");
+        }
+        bld.append(internalException.getClass().getSimpleName());
+        if (externalException.isPresent()) {
+            bld.append(" (treated as ").
+                
append(externalException.get().getClass().getSimpleName()).append(")");
+        }
+        if (causesFailover()) {
+            bld.append(" at epoch ").append(epoch);
+        }
+        if (deltaUs.isPresent()) {
+            bld.append(" in ").append(deltaUs.getAsLong()).append(" 
microseconds");
+        }
+        if (causesFailover()) {
+            if (isActiveController) {
+                bld.append(". Renouncing leadership and reverting to the last 
committed offset ");
+                bld.append(lastCommittedOffset);
+            } else {
+                bld.append(". The controller is already in standby mode");
+            }
+        }
+        bld.append(".");
+        return bld.toString();
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(isFault,
+                causesFailover,
+                internalException.getClass().getCanonicalName(),
+                internalException.getMessage(),
+                
externalException.orElse(internalException).getClass().getCanonicalName(),
+                externalException.orElse(internalException).getMessage());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || 
!(o.getClass().equals(EventHandlerExceptionInfo.class))) return false;
+        EventHandlerExceptionInfo other = (EventHandlerExceptionInfo) o;
+        return isFault == other.isFault &&
+                causesFailover == other.causesFailover &&
+                exceptionClassesAndMessagesMatch(internalException, 
other.internalException) &&
+                
exceptionClassesAndMessagesMatch(externalException.orElse(null),
+                        other.externalException.orElse(null));
+    }
+
+    @Override
+    public String toString() {
+        return "EventHandlerExceptionInfo" +
+            "(isFault=" + isFault +
+            ", causesFailover=" + causesFailover +
+            ", internalException.class=" + 
internalException.getClass().getCanonicalName() +
+            ", externalException.class=" + (externalException.isPresent() ?
+                externalException.get().getClass().getCanonicalName() : 
"(none)") +
+            ")";
+    }
+}
\ No newline at end of file
diff --git 
a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java 
b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
index 768fcb2574b..c923742af9f 100644
--- a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -495,7 +495,7 @@ public class MetadataLoader implements 
RaftClient.Listener<ApiMessageAndVersion>
     }
 
     @Override
-    public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+    public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch, long 
endOffset) {
         eventQueue.append(() -> {
             currentLeaderAndEpoch = leaderAndEpoch;
             for (MetadataPublisher publisher : publishers.values()) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
index 6f82d915d8d..0ccf39a85ac 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/util/BatchFileWriter.java
@@ -34,6 +34,7 @@ import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;
 
 import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
 
@@ -61,11 +62,11 @@ public class BatchFileWriter implements AutoCloseable {
     }
 
     public void append(ApiMessageAndVersion apiMessageAndVersion) {
-        batchAccumulator.append(0, 
Collections.singletonList(apiMessageAndVersion));
+        batchAccumulator.append(0, 
Collections.singletonList(apiMessageAndVersion), OptionalLong.empty(), false);
     }
 
     public void append(List<ApiMessageAndVersion> messageBatch) {
-        batchAccumulator.append(0, messageBatch);
+        batchAccumulator.append(0, messageBatch, OptionalLong.empty(), false);
     }
 
     public void close() throws IOException {
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
index 19875741ec2..5db1e575e79 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/util/SnapshotFileReader.java
@@ -137,7 +137,7 @@ public final class SnapshotFileReader implements 
AutoCloseable {
                         listener.handleLeaderChange(new LeaderAndEpoch(
                             OptionalInt.of(message.leaderId()),
                             batch.partitionLeaderEpoch()
-                        ));
+                        ), batch.lastOffset() + 1);
                         break;
                     default:
                         log.error("Ignoring control record with type {} at 
offset {}",
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java
index 81c234491b5..2d1905b1a5c 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/errors/ControllerExceptionsTest.java
@@ -20,20 +20,15 @@ package org.apache.kafka.controller.errors;
 import org.apache.kafka.common.errors.NotControllerException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicExistsException;
-import org.apache.kafka.common.errors.UnknownServerException;
-import org.apache.kafka.raft.errors.NotLeaderException;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 
 import java.util.OptionalInt;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.RejectedExecutionException;
 
-import static 
org.apache.kafka.controller.errors.ControllerExceptions.isExpected;
 import static 
org.apache.kafka.controller.errors.ControllerExceptions.isTimeoutException;
 import static 
org.apache.kafka.controller.errors.ControllerExceptions.newPreMigrationException;
 import static 
org.apache.kafka.controller.errors.ControllerExceptions.newWrongControllerException;
-import static 
org.apache.kafka.controller.errors.ControllerExceptions.toExternalException;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -99,32 +94,7 @@ public class ControllerExceptionsTest {
             newWrongControllerException(OptionalInt.of(1)));
     }
 
-    @Test
-    public void testApiExceptionIsExpected() {
-        assertTrue(isExpected(new TopicExistsException("")));
-    }
-
-    @Test
-    public void testNotLeaderExceptionIsExpected() {
-        assertTrue(isExpected(new NotLeaderException("")));
-    }
-
-    @Test
-    public void testRejectedExecutionExceptionIsExpected() {
-        assertTrue(isExpected(new RejectedExecutionException()));
-    }
-
-    @Test
-    public void testInterruptedExceptionIsNotExpected() {
-        assertFalse(isExpected(new InterruptedException()));
-    }
-
-    @Test
-    public void testRuntimeExceptionIsNotExpected() {
-        assertFalse(isExpected(new NullPointerException()));
-    }
-
-    private static void assertExceptionsMatch(Throwable a, Throwable b) {
+    static void assertExceptionsMatch(Throwable a, Throwable b) {
         assertEquals(a.getClass(), b.getClass());
         assertEquals(a.getMessage(), b.getMessage());
         if (a.getCause() != null) {
@@ -134,40 +104,4 @@ public class ControllerExceptionsTest {
             assertNull(b.getCause());
         }
     }
-
-    @Test
-    public void testApiExceptionToExternalException() {
-        assertExceptionsMatch(new TopicExistsException("Topic foo exists"),
-            toExternalException(new TopicExistsException("Topic foo exists"),
-                () -> OptionalInt.of(1)));
-    }
-
-    @Test
-    public void testNotLeaderExceptionToExternalException() {
-        assertExceptionsMatch(new NotControllerException("The active 
controller appears to be node 1."),
-            toExternalException(new NotLeaderException("Append failed because 
the given epoch 123 is stale."),
-                () -> OptionalInt.of(1)));
-    }
-
-    @Test
-    public void testRejectedExecutionExceptionToExternalException() {
-        assertExceptionsMatch(new TimeoutException("The controller is shutting 
down.",
-            new RejectedExecutionException("The event queue is shutting 
down")),
-                toExternalException(new RejectedExecutionException("The event 
queue is shutting down"),
-                    () -> OptionalInt.empty()));
-    }
-
-    @Test
-    public void testInterruptedExceptionToExternalException() {
-        assertExceptionsMatch(new UnknownServerException("The controller was 
interrupted."),
-            toExternalException(new InterruptedException(),
-                () -> OptionalInt.empty()));
-    }
-
-    @Test
-    public void testRuntimeExceptionToExternalException() {
-        assertExceptionsMatch(new UnknownServerException(new 
NullPointerException("Null pointer exception")),
-            toExternalException(new NullPointerException("Null pointer 
exception"),
-                () -> OptionalInt.empty()));
-    }
 }
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java
 
b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java
new file mode 100644
index 00000000000..ccffde2831e
--- /dev/null
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/errors/EventHandlerExceptionInfoTest.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller.errors;
+
+import org.apache.kafka.common.errors.NotControllerException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.TopicExistsException;
+import org.apache.kafka.common.errors.UnknownServerException;
+import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.concurrent.RejectedExecutionException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+@Timeout(value = 40)
+public class EventHandlerExceptionInfoTest {
+    private static final EventHandlerExceptionInfo TOPIC_EXISTS =
+        EventHandlerExceptionInfo.fromInternal(
+            new TopicExistsException("Topic exists."),
+            () -> OptionalInt.empty());
+
+    private static final EventHandlerExceptionInfo REJECTED_EXECUTION =
+        EventHandlerExceptionInfo.fromInternal(
+            new RejectedExecutionException(),
+            () -> OptionalInt.empty());
+
+    private static final EventHandlerExceptionInfo INTERRUPTED =
+        EventHandlerExceptionInfo.fromInternal(
+            new InterruptedException(),
+            () -> OptionalInt.of(1));
+
+    private static final EventHandlerExceptionInfo NULL_POINTER =
+        EventHandlerExceptionInfo.fromInternal(
+            new NullPointerException(),
+            () -> OptionalInt.of(1));
+
+    private static final EventHandlerExceptionInfo NOT_LEADER =
+        EventHandlerExceptionInfo.fromInternal(
+            new NotLeaderException("Append failed"),
+            () -> OptionalInt.of(2));
+
+    private static final EventHandlerExceptionInfo UNEXPECTED_END_OFFSET =
+        EventHandlerExceptionInfo.fromInternal(
+            new UnexpectedEndOffsetException("Wanted end offset 3, but next 
available was 4"),
+            () -> OptionalInt.of(1));
+
+    @Test
+    public void testTopicExistsExceptionInfo() {
+        assertEquals(new EventHandlerExceptionInfo(false, false,
+            new TopicExistsException("Topic exists.")),
+                TOPIC_EXISTS);
+    }
+
+    @Test
+    public void testTopicExistsExceptionFailureMessage() {
+        assertEquals("failed with TopicExistsException in 234 microseconds.",
+            TOPIC_EXISTS.failureMessage(123, OptionalLong.of(234L), true, 
456L));
+    }
+
+    @Test
+    public void testRejectedExecutionExceptionInfo() {
+        assertEquals(new EventHandlerExceptionInfo(false, false,
+            new RejectedExecutionException(),
+            new TimeoutException("The controller is shutting down.", new 
RejectedExecutionException())),
+                REJECTED_EXECUTION);
+    }
+
+    @Test
+    public void testRejectedExecutionExceptionFailureMessage() {
+        assertEquals("unable to start processing because of 
RejectedExecutionException (treated " +
+            "as TimeoutException).",
+            REJECTED_EXECUTION.failureMessage(123, OptionalLong.empty(), true, 
456L));
+    }
+
+    @Test
+    public void testInterruptedExceptionInfo() {
+        assertEquals(new EventHandlerExceptionInfo(true, true,
+            new InterruptedException(),
+            new UnknownServerException("The controller was interrupted.")),
+                INTERRUPTED);
+    }
+
+    @Test
+    public void testInterruptedExceptionFailureMessageWhenActive() {
+        assertEquals("unable to start processing because of 
InterruptedException (treated as " +
+            "UnknownServerException) at epoch 123. Renouncing leadership and 
reverting to the " +
+            "last committed offset 456.",
+            INTERRUPTED.failureMessage(123, OptionalLong.empty(), true, 456L));
+    }
+
+    @Test
+    public void testInterruptedExceptionFailureMessageWhenInactive() {
+        assertEquals("unable to start processing because of 
InterruptedException (treated as " +
+            "UnknownServerException) at epoch 123. The controller is already 
in standby mode.",
+                INTERRUPTED.failureMessage(123, OptionalLong.empty(), false, 
456L));
+    }
+
+    @Test
+    public void testNullPointerExceptionInfo() {
+        assertEquals(new EventHandlerExceptionInfo(true, true,
+            new NullPointerException(),
+            new UnknownServerException(new NullPointerException())),
+                NULL_POINTER);
+    }
+
+    @Test
+    public void testNullPointerExceptionFailureMessageWhenActive() {
+        assertEquals("failed with NullPointerException (treated as 
UnknownServerException) " +
+            "at epoch 123 in 40 microseconds. Renouncing leadership and 
reverting to the last " +
+            "committed offset 456.",
+                NULL_POINTER.failureMessage(123, OptionalLong.of(40L), true, 
456L));
+    }
+
+    @Test
+    public void testNullPointerExceptionFailureMessageWhenInactive() {
+        assertEquals("failed with NullPointerException (treated as 
UnknownServerException) " +
+            "at epoch 123 in 40 microseconds. The controller is already in 
standby mode.",
+                NULL_POINTER.failureMessage(123, OptionalLong.of(40L), false, 
456L));
+    }
+
+    @Test
+    public void testNotLeaderExceptionInfo() {
+        assertEquals(new EventHandlerExceptionInfo(false, true,
+            new NotLeaderException("Append failed"),
+            new NotControllerException("The active controller appears to be 
node 2.")),
+                NOT_LEADER);
+    }
+
+    @Test
+    public void testNotLeaderExceptionFailureMessage() {
+        assertEquals("unable to start processing because of NotLeaderException 
(treated as " +
+            "NotControllerException) at epoch 123. Renouncing leadership and 
reverting to the " +
+            "last committed offset 456.",
+            NOT_LEADER.failureMessage(123, OptionalLong.empty(), true, 456L));
+    }
+
+    @Test
+    public void testUnexpectedEndOffsetExceptionInfo() {
+        assertEquals(new EventHandlerExceptionInfo(false, true,
+            new UnexpectedEndOffsetException("Wanted end offset 3, but next 
available was 4"),
+            new NotControllerException("Unexpected end offset. Controller not 
known.")),
+                UNEXPECTED_END_OFFSET);
+    }
+
+    @Test
+    public void testUnepxectedEndOffsetFailureMessage() {
+        assertEquals("failed with UnexpectedEndOffsetException (treated as " +
+            "NotControllerException) at epoch 123 in 90 microseconds. 
Renouncing leadership " +
+            "and reverting to the last committed offset 456.",
+                UNEXPECTED_END_OFFSET.failureMessage(123, 
OptionalLong.of(90L), true, 456L));
+    }
+}
diff --git 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
index ca72aa058ef..038de599009 100644
--- 
a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
@@ -81,7 +81,11 @@ public class SnapshotEmitterTest {
         }
 
         @Override
-        public long scheduleAtomicAppend(int epoch, List<ApiMessageAndVersion> 
records) {
+        public long scheduleAtomicAppend(
+            int epoch,
+            OptionalLong requiredEndOffset,
+            List<ApiMessageAndVersion> records
+        ) {
             return 0;
         }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 46f99db5d1d..1f1b5d72e47 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -32,6 +32,7 @@ import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClient;
 import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
 import org.apache.kafka.raft.internals.MemoryBatchReader;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.snapshot.MockRawSnapshotReader;
@@ -221,13 +222,26 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             }
         }
 
-        synchronized long tryAppend(int nodeId, int epoch, 
List<ApiMessageAndVersion> batch) {
+        synchronized long tryAppend(
+            int nodeId,
+            int epoch,
+            OptionalLong requiredEndOffset,
+            List<ApiMessageAndVersion> batch
+        ) {
             // No easy access to the concept of time. Use the base offset as 
the append timestamp
             long appendTimestamp = (prevOffset + 1) * 10;
-            return tryAppend(nodeId, epoch, new LocalRecordBatch(epoch, 
appendTimestamp, batch));
-        }
-
-        synchronized long tryAppend(int nodeId, int epoch, LocalBatch batch) {
+            return tryAppend(nodeId,
+                    epoch,
+                    requiredEndOffset,
+                    new LocalRecordBatch(epoch, appendTimestamp, batch));
+        }
+
+        synchronized long tryAppend(
+            int nodeId,
+            int epoch,
+            OptionalLong requiredEndOffset,
+            LocalBatch batch
+        ) {
             if (!leader.isLeader(nodeId)) {
                 log.debug("tryAppend(nodeId={}, epoch={}): the given node id 
does not " +
                         "match the current leader id of {}.", nodeId, epoch, 
leader.leaderId());
@@ -243,15 +257,24 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             }
 
             log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
-            long offset = append(batch);
+            long offset = append(requiredEndOffset, batch);
             electLeaderIfNeeded();
             return offset;
         }
 
-        public synchronized long append(LocalBatch batch) {
-            prevOffset += batch.size();
-            log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
-            batches.put(prevOffset, batch);
+        public synchronized long append(
+            OptionalLong requiredEndOffset,
+            LocalBatch batch
+        ) {
+            long nextEndOffset = prevOffset + batch.size();
+            requiredEndOffset.ifPresent(r -> {
+                if (r != nextEndOffset) {
+                    throw new UnexpectedEndOffsetException("Wanted end offset 
" + r +
+                            ", but next available was " + nextEndOffset);
+                }
+            });
+            log.debug("append(batch={}, nextEndOffset={})", batch, 
nextEndOffset);
+            batches.put(nextEndOffset, batch);
             if (batch instanceof LeaderChangeBatch) {
                 LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) 
batch;
                 leader = leaderChangeBatch.newLeader;
@@ -259,7 +282,8 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             for (LocalLogManager logManager : logManagers.values()) {
                 logManager.scheduleLogCheck();
             }
-            return prevOffset;
+            prevOffset = nextEndOffset;
+            return nextEndOffset;
         }
 
         synchronized void electLeaderIfNeeded() {
@@ -274,7 +298,7 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
             }
             LeaderAndEpoch newLeader = new 
LeaderAndEpoch(OptionalInt.of(nextLeaderNode), leader.epoch() + 1);
             log.info("Elected new leader: {}.", newLeader);
-            append(new LeaderChangeBatch(newLeader));
+            append(OptionalLong.empty(), new LeaderChangeBatch(newLeader));
         }
 
         synchronized LeaderAndEpoch leaderAndEpoch() {
@@ -431,8 +455,8 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
 
         void handleLeaderChange(long offset, LeaderAndEpoch leader) {
             // Simulate KRaft implementation by first bumping the epoch before 
assigning a leader
-            listener.handleLeaderChange(new 
LeaderAndEpoch(OptionalInt.empty(), leader.epoch()));
-            listener.handleLeaderChange(leader);
+            listener.handleLeaderChange(new 
LeaderAndEpoch(OptionalInt.empty(), leader.epoch()), offset + 1);
+            listener.handleLeaderChange(leader, offset + 1);
 
             notifiedLeader = leader;
             this.offset = offset;
@@ -737,7 +761,9 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
 
         OptionalLong firstOffset = first
             .stream()
-            .mapToLong(record -> scheduleAtomicAppend(epoch, 
Collections.singletonList(record)))
+            .mapToLong(record -> scheduleAtomicAppend(epoch,
+                    OptionalLong.empty(),
+                    Collections.singletonList(record)))
             .max();
 
         if (firstOffset.isPresent() && 
resignAfterNonAtomicCommit.getAndSet(false)) {
@@ -749,15 +775,24 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
         } else {
             return second
                 .stream()
-                .mapToLong(record -> scheduleAtomicAppend(epoch, 
Collections.singletonList(record)))
+                .mapToLong(record -> scheduleAtomicAppend(epoch,
+                        OptionalLong.empty(),
+                        Collections.singletonList(record)))
                 .max()
                 .getAsLong();
         }
     }
 
     @Override
-    public long scheduleAtomicAppend(int epoch, List<ApiMessageAndVersion> 
batch) {
-        return shared.tryAppend(nodeId, leader.epoch(), batch);
+    public long scheduleAtomicAppend(
+        int epoch,
+        OptionalLong requiredEndOffset,
+        List<ApiMessageAndVersion> batch
+    ) {
+        if (batch.isEmpty()) {
+            throw new IllegalArgumentException("Batch cannot be empty");
+        }
+        return shared.tryAppend(nodeId, leader.epoch(), requiredEndOffset, 
batch);
     }
 
     @Override
@@ -784,7 +819,10 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
 
         LeaderAndEpoch nextLeader = new LeaderAndEpoch(OptionalInt.empty(), 
currentEpoch + 1);
         try {
-            shared.tryAppend(nodeId, currentEpoch, new 
LeaderChangeBatch(nextLeader));
+            shared.tryAppend(nodeId,
+                    currentEpoch,
+                    OptionalLong.empty(),
+                    new LeaderChangeBatch(nextLeader));
         } catch (NotLeaderException exp) {
             // the leader epoch has already advanced. resign is a no op.
             log.debug("Ignoring call to resign from epoch {}. Either we are 
not the leader or the provided epoch is " +
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index d72b7557b48..c3a7283ab9e 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -36,6 +36,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 
@@ -155,11 +156,12 @@ public class LocalLogManagerTestEnv implements 
AutoCloseable {
      */
     public void appendInitialRecords(List<ApiMessageAndVersion> records) {
         int initialLeaderEpoch = 1;
-        shared.append(new LeaderChangeBatch(
+        shared.append(OptionalLong.empty(), new LeaderChangeBatch(
             new LeaderAndEpoch(OptionalInt.empty(), initialLeaderEpoch + 1)));
-        shared.append(new LocalRecordBatch(initialLeaderEpoch + 1, 0, 
records));
-        shared.append(new LeaderChangeBatch(
-            new LeaderAndEpoch(OptionalInt.of(0), initialLeaderEpoch + 2)));
+        shared.append(OptionalLong.empty(),
+            new LocalRecordBatch(initialLeaderEpoch + 1, 0, records));
+        shared.append(OptionalLong.empty(),
+            new LeaderChangeBatch(new LeaderAndEpoch(OptionalInt.of(0), 
initialLeaderEpoch + 2)));
     }
 
     public String clusterId() {
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
 
b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
index 3d7267d94a5..026f0ac5301 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metalog/MockMetaLogManagerListener.java
@@ -90,7 +90,7 @@ public class MockMetaLogManagerListener implements 
RaftClient.Listener<ApiMessag
     }
 
     @Override
-    public synchronized void handleLeaderChange(LeaderAndEpoch 
newLeaderAndEpoch) {
+    public synchronized void handleLeaderChange(LeaderAndEpoch 
newLeaderAndEpoch, long endOffset) {
         LeaderAndEpoch oldLeaderAndEpoch = this.leaderAndEpoch;
         this.leaderAndEpoch = newLeaderAndEpoch;
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 40c6a695908..764c0999558 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -2287,27 +2287,22 @@ public class KafkaRaftClient<T> implements 
RaftClient<T> {
 
     @Override
     public long scheduleAppend(int epoch, List<T> records) {
-        return append(epoch, records, false);
+        return append(epoch, records, OptionalLong.empty(), false);
     }
 
     @Override
-    public long scheduleAtomicAppend(int epoch, List<T> records) {
-        return append(epoch, records, true);
+    public long scheduleAtomicAppend(int epoch, OptionalLong 
requiredEndOffset, List<T> records) {
+        return append(epoch, records, requiredEndOffset, true);
     }
 
-    private long append(int epoch, List<T> records, boolean isAtomic) {
+    private long append(int epoch, List<T> records, OptionalLong 
requiredEndOffset, boolean isAtomic) {
         LeaderState<T> leaderState = quorum.<T>maybeLeaderState().orElseThrow(
             () -> new NotLeaderException("Append failed because the 
replication is not the current leader")
         );
 
         BatchAccumulator<T> accumulator = leaderState.accumulator();
         boolean isFirstAppend = accumulator.isEmpty();
-        final long offset;
-        if (isAtomic) {
-            offset = accumulator.appendAtomic(epoch, records);
-        } else {
-            offset = accumulator.append(epoch, records);
-        }
+        final long offset = accumulator.append(epoch, records, 
requiredEndOffset, isAtomic);
 
         // Wakeup the network channel if either this is the first append
         // or the accumulator is ready to drain now. Checking for the first
@@ -2570,10 +2565,10 @@ public class KafkaRaftClient<T> implements 
RaftClient<T> {
 
         /**
          * This API is used for committed records originating from {@link 
#scheduleAppend(int, List)}
-         * or {@link #scheduleAtomicAppend(int, List)} on this instance. In 
this case, we are able to
-         * save the original record objects, which saves the need to read them 
back from disk. This is
-         * a nice optimization for the leader which is typically doing more 
work than all of the
-         * followers.
+         * or {@link #scheduleAtomicAppend(int, OptionalLong, List)} on this 
instance. In this case,
+         * we are able to save the original record objects, which saves the 
need to read them back
+         * from disk. This is a nice optimization for the leader which is 
typically doing more work
+         * than all of the * followers.
          */
         private void fireHandleCommit(
             long baseOffset,
@@ -2608,7 +2603,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             if (shouldFireLeaderChange(leaderAndEpoch)) {
                 lastFiredLeaderChange = leaderAndEpoch;
                 logger.debug("Notifying listener {} of leader change {}", 
listenerName(), leaderAndEpoch);
-                listener.handleLeaderChange(leaderAndEpoch);
+                listener.handleLeaderChange(leaderAndEpoch, 
log.endOffset().offset);
             }
         }
 
@@ -2630,7 +2625,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             // leader and begins writing to the log.
             if (shouldFireLeaderChange(leaderAndEpoch) && nextOffset() >= 
epochStartOffset) {
                 lastFiredLeaderChange = leaderAndEpoch;
-                listener.handleLeaderChange(leaderAndEpoch);
+                listener.handleLeaderChange(leaderAndEpoch, 
log.endOffset().offset);
             }
         }
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 6422fb5f3a0..2b899923f91 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.raft;
 
 import org.apache.kafka.raft.errors.BufferAllocationException;
 import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
 import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.SnapshotWriter;
 
@@ -36,10 +37,10 @@ public interface RaftClient<T> extends AutoCloseable {
          * after consuming the reader.
          *
          * Note that there is not a one-to-one correspondence between writes 
through
-         * {@link #scheduleAppend(int, List)} or {@link 
#scheduleAtomicAppend(int, List)}
+         * {@link #scheduleAppend(int, List)} or {@link 
#scheduleAtomicAppend(int, OptionalLong, List)}
          * and this callback. The Raft implementation is free to batch 
together the records
          * from multiple append calls provided that batch boundaries are 
respected. Records
-         * specified through {@link #scheduleAtomicAppend(int, List)} are 
guaranteed to be a
+         * specified through {@link #scheduleAtomicAppend(int, OptionalLong, 
List)} are guaranteed to be a
          * subset of a batch provided by the {@link BatchReader}. Records 
specified through
          * {@link #scheduleAppend(int, List)} are guaranteed to be in the same 
order but
          * they can map to any number of batches provided by the {@link 
BatchReader}.
@@ -80,8 +81,11 @@ public interface RaftClient<T> extends AutoCloseable {
          * epoch.
          *
          * @param leader the current leader and epoch
+         * @param endOffset the current log end offset (exclusive). This is 
useful for nodes that
+         *                  are claiming leadership, because it lets them know 
what log offset they
+         *                  should attempt to write to next.
          */
-        default void handleLeaderChange(LeaderAndEpoch leader) {}
+        default void handleLeaderChange(LeaderAndEpoch leader, long endOffset) 
{}
 
         default void beginShutdown() {}
     }
@@ -172,6 +176,7 @@ public interface RaftClient<T> extends AutoCloseable {
      * uncommitted entries after observing an epoch change.
      *
      * @param epoch the current leader epoch
+     * @param requiredEndOffset if this is set, it is the offset we must use.
      * @param records the list of records to append
      * @return the expected offset of the last record if append succeed
      * @throws org.apache.kafka.common.errors.RecordBatchTooLargeException if 
the size of the records is greater than the maximum
@@ -179,8 +184,9 @@ public interface RaftClient<T> extends AutoCloseable {
      *         committed
      * @throws NotLeaderException if we are not the current leader or the 
epoch doesn't match the leader epoch
      * @throws BufferAllocationException we failed to allocate memory for the 
records
+     * @throws UnexpectedEndOffsetException the requested end offset could not 
be obtained.
      */
-    long scheduleAtomicAppend(int epoch, List<T> records);
+    long scheduleAtomicAppend(int epoch, OptionalLong requiredEndOffset, 
List<T> records);
 
     /**
      * Attempt a graceful shutdown of the client. This allows the leader to 
proactively
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java 
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index cceb65930ed..36fa99e12c9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -165,7 +165,7 @@ public class ReplicatedCounter implements 
RaftClient.Listener<Integer> {
     }
 
     @Override
-    public synchronized void handleLeaderChange(LeaderAndEpoch newLeader) {
+    public synchronized void handleLeaderChange(LeaderAndEpoch newLeader, long 
endOffset) {
         if (newLeader.isLeader(nodeId)) {
             log.debug("Counter uncommitted value initialized to {} after 
claiming leadership in epoch {}",
                 committed, newLeader);
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java
 
b/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java
new file mode 100644
index 00000000000..f470134c9a3
--- /dev/null
+++ 
b/raft/src/main/java/org/apache/kafka/raft/errors/UnexpectedEndOffsetException.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.errors;
+
+/**
+ * Indicates that an append operation cannot be completed because it would 
have resulted in an
+ * unexpected end offset.
+ */
+public class UnexpectedEndOffsetException extends RaftException {
+    private final static long serialVersionUID = 1L;
+
+    public UnexpectedEndOffsetException(String s) {
+        super(s);
+    }
+}
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index b84a7d57b8a..87717d206d4 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.raft.errors.BufferAllocationException;
 import org.apache.kafka.raft.errors.NotLeaderException;
+import org.apache.kafka.raft.errors.UnexpectedEndOffsetException;
 import org.apache.kafka.server.common.serialization.RecordSerde;
 
 import org.apache.kafka.common.message.LeaderChangeMessage;
@@ -38,6 +39,7 @@ import java.util.List;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.OptionalInt;
+import java.util.OptionalLong;
 import java.util.function.Function;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -89,52 +91,12 @@ public class BatchAccumulator<T> implements Closeable {
         this.appendLock = new ReentrantLock();
     }
 
-    /**
-     * Append a list of records into as many batches as necessary.
-     *
-     * The order of the elements in the records argument will match the order 
in the batches.
-     * This method will use as many batches as necessary to serialize all of 
the records. Since
-     * this method can split the records into multiple batches it is possible 
that some of the
-     * records will get committed while other will not when the leader fails.
-     *
-     * @param epoch the expected leader epoch. If this does not match, then 
{@link NotLeaderException}
-     *              will be thrown
-     * @param records the list of records to include in the batches
-     * @return the expected offset of the last record
-     * @throws RecordBatchTooLargeException if the size of one record T is 
greater than the maximum
-     *         batch size; if this exception is throw some of the elements in 
records may have
-     *         been committed
-     * @throws NotLeaderException if the epoch is less than the leader epoch
-     * @throws IllegalArgumentException if the epoch is invalid (greater than 
the leader epoch)
-     * @throws BufferAllocationException if we failed to allocate memory for 
the records
-     * @throws IllegalStateException if we tried to append new records after 
the batch has been built
-     */
-    public long append(int epoch, List<T> records) {
-        return append(epoch, records, false);
-    }
-
-    /**
-     * Append a list of records into an atomic batch. We guarantee all records 
are included in the
-     * same underlying record batch so that either all of the records become 
committed or none of
-     * them do.
-     *
-     * @param epoch the expected leader epoch. If this does not match, then 
{@link NotLeaderException}
-     *              will be thrown
-     * @param records the list of records to include in a batch
-     * @return the expected offset of the last record
-     * @throws RecordBatchTooLargeException if the size of the records is 
greater than the maximum
-     *         batch size; if this exception is throw none of the elements in 
records were
-     *         committed
-     * @throws NotLeaderException if the epoch is less than the leader epoch
-     * @throws IllegalArgumentException if the epoch is invalid (greater than 
the leader epoch)
-     * @throws BufferAllocationException if we failed to allocate memory for 
the records
-     * @throws IllegalStateException if we tried to append new records after 
the batch has been built
-     */
-    public long appendAtomic(int epoch, List<T> records) {
-        return append(epoch, records, true);
-    }
-
-    private long append(int epoch, List<T> records, boolean isAtomic) {
+    public long append(
+        int epoch,
+        List<T> records,
+        OptionalLong requiredEndOffset,
+        boolean isAtomic
+    ) {
         if (epoch < this.epoch) {
             throw new NotLeaderException("Append failed because the given 
epoch " + epoch + " is stale. " +
                     "Current leader epoch = " + this.epoch());
@@ -147,6 +109,13 @@ public class BatchAccumulator<T> implements Closeable {
 
         appendLock.lock();
         try {
+            long endOffset = nextOffset + records.size() - 1;
+            requiredEndOffset.ifPresent(r -> {
+                if (r != endOffset) {
+                    throw new UnexpectedEndOffsetException("Wanted end offset 
" + r +
+                            ", but next available was " + endOffset);
+                }
+            });
             maybeCompleteDrain();
 
             BatchBuilder<T> batch = null;
@@ -164,12 +133,12 @@ public class BatchAccumulator<T> implements Closeable {
                 }
 
                 batch.appendRecord(record, serializationCache);
-                nextOffset += 1;
             }
 
             maybeResetLinger();
 
-            return nextOffset - 1;
+            nextOffset = endOffset + 1;
+            return endOffset;
         } finally {
             appendLock.unlock();
         }
@@ -408,7 +377,9 @@ public class BatchAccumulator<T> implements Closeable {
      * This call will not block, but the drain may require multiple attempts 
before
      * it can be completed if the thread responsible for appending is holding 
the
      * append lock. In the worst case, the append will be completed on the next
-     * call to {@link #append(int, List)} following the initial call to this 
method.
+     * call to {@link #append(int, List, OptionalLong, boolean)} following the
+     * initial call to this method.
+     *
      * The caller should respect the time to the next flush as indicated by
      * {@link #timeUntilDrain(long)}.
      *
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java 
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
index eeacf608a9f..a3f726be63e 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.record.ControlRecordUtils;
 
 import java.util.Optional;
 import java.util.List;
+import java.util.OptionalLong;
 import java.util.function.Supplier;
 
 final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
@@ -184,7 +185,7 @@ final public class RecordsSnapshotWriter<T> implements 
SnapshotWriter<T> {
             throw new IllegalStateException(message);
         }
 
-        accumulator.append(snapshot.snapshotId().epoch(), records);
+        accumulator.append(snapshot.snapshotId().epoch(), records, 
OptionalLong.empty(), false);
 
         if (accumulator.needsDrain(time.milliseconds())) {
             appendBatches(accumulator.drain());
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java 
b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
index 7cbeb11f4f7..44704e901f6 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientTest.java
@@ -355,7 +355,8 @@ public class KafkaRaftClientTest {
         for (int i = 0; i < size; i++)
             batchToLarge.add("a");
 
-        assertThrows(RecordBatchTooLargeException.class, () -> 
context.client.scheduleAtomicAppend(epoch, batchToLarge));
+        assertThrows(RecordBatchTooLargeException.class,
+                () -> context.client.scheduleAtomicAppend(epoch, 
OptionalLong.empty(), batchToLarge));
     }
 
     @Test
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index 06a300f666c..6f8e3e422e3 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -1197,7 +1197,7 @@ public final class RaftClientTestContext {
         }
 
         @Override
-        public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+        public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch, long 
endOffset) {
             // We record the next expected offset as the claimed epoch's start
             // offset. This is useful to verify that the `handleLeaderChange` 
callback
             // was not received early.
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
index 11499771790..ed7f6c94122 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
@@ -33,6 +33,7 @@ import org.mockito.Mockito;
 import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalLong;
 import java.util.concurrent.CountDownLatch;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -232,7 +233,7 @@ class BatchAccumulatorTest {
         );
 
         time.sleep(15);
-        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a")));
+        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), 
OptionalLong.empty(), false));
         assertEquals(lingerMs, acc.timeUntilDrain(time.milliseconds()));
         assertFalse(acc.isEmpty());
 
@@ -264,7 +265,7 @@ class BatchAccumulatorTest {
             maxBatchSize
         );
 
-        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a")));
+        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), 
OptionalLong.empty(), false));
         time.sleep(lingerMs);
 
         List<BatchAccumulator.CompletedBatch<String>> batches = acc.drain();
@@ -293,7 +294,7 @@ class BatchAccumulatorTest {
             maxBatchSize
         );
 
-        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a")));
+        assertEquals(baseOffset, acc.append(leaderEpoch, singletonList("a"), 
OptionalLong.empty(), false));
         acc.close();
         Mockito.verify(memoryPool).release(buffer);
     }
@@ -396,7 +397,7 @@ class BatchAccumulatorTest {
             .generate(() -> record)
             .limit(numberOfRecords)
             .collect(Collectors.toList());
-        assertEquals(baseOffset + numberOfRecords - 1, acc.append(leaderEpoch, 
records));
+        assertEquals(baseOffset + numberOfRecords - 1, acc.append(leaderEpoch, 
records, OptionalLong.empty(), false));
 
         time.sleep(lingerMs);
         assertTrue(acc.needsDrain(time.milliseconds()));
@@ -451,7 +452,7 @@ class BatchAccumulatorTest {
         // Do the first append outside the thread to start the linger timer
         Mockito.when(memoryPool.tryAllocate(maxBatchSize))
             .thenReturn(ByteBuffer.allocate(maxBatchSize));
-        acc.append(leaderEpoch, singletonList("a"));
+        acc.append(leaderEpoch, singletonList("a"), OptionalLong.empty(), 
false);
 
         // Let the serde block to simulate a slow append
         Mockito.doAnswer(invocation -> {
@@ -466,7 +467,7 @@ class BatchAccumulatorTest {
             Mockito.any(Writable.class)
         );
 
-        Thread appendThread = new Thread(() -> acc.append(leaderEpoch, 
singletonList("b")));
+        Thread appendThread = new Thread(() -> acc.append(leaderEpoch, 
singletonList("b"), OptionalLong.empty(), false));
         appendThread.start();
 
         // Attempt to drain while the append thread is holding the lock
@@ -509,14 +510,14 @@ class BatchAccumulatorTest {
     static final Appender APPEND_ATOMIC = new Appender() {
         @Override
         public Long call(BatchAccumulator<String> acc, int epoch, List<String> 
records) {
-            return acc.appendAtomic(epoch, records);
+            return acc.append(epoch, records, OptionalLong.empty(), true);
         }
     };
 
     static final Appender APPEND = new Appender() {
         @Override
         public Long call(BatchAccumulator<String> acc, int epoch, List<String> 
records) {
-            return acc.append(epoch, records);
+            return acc.append(epoch, records, OptionalLong.empty(), false);
         }
     };
 }
diff --git 
a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
 
b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
index 7c4f0e62a95..e9f9487570a 100644
--- 
a/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/deferred/DeferredEventQueueTest.java
@@ -24,6 +24,8 @@ import java.util.concurrent.ExecutionException;
 import org.apache.kafka.common.utils.LogContext;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -101,4 +103,28 @@ public class DeferredEventQueueTest {
         assertEquals(RuntimeException.class, 
assertThrows(ExecutionException.class,
             () -> event3.future.get()).getCause().getClass());
     }
+
+    @ParameterizedTest
+    @ValueSource(booleans={false, true})
+    public void testReEntrantCompletion(boolean completeExceptionally) {
+        final DeferredEventQueue deferredEventQueue = new 
DeferredEventQueue(new LogContext());
+        SampleDeferredEvent event1 = new SampleDeferredEvent();
+        SampleDeferredEvent event2 = new SampleDeferredEvent();
+        event1.future().thenAccept(__ -> {
+            deferredEventQueue.completeUpTo(3);
+        });
+        deferredEventQueue.add(1, event1);
+        deferredEventQueue.add(3, event2);
+        deferredEventQueue.completeUpTo(0);
+        assertFalse(event1.future().isDone());
+        assertFalse(event2.future().isDone());
+        if (completeExceptionally) {
+            deferredEventQueue.failAll(new RuntimeException("oops"));
+        } else {
+            deferredEventQueue.completeUpTo(1);
+        }
+        assertTrue(event1.future().isDone());
+        assertEquals(completeExceptionally, 
event1.future().isCompletedExceptionally());
+        assertTrue(event2.future().isDone());
+    }
 }

Reply via email to