This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 440f5f6c097 MINOR; Validate at least one control record (#15912)
440f5f6c097 is described below

commit 440f5f6c09720bb9414524781342bbf35973c281
Author: José Armando García Sancio <jsan...@users.noreply.github.com>
AuthorDate: Tue May 14 10:02:29 2024 -0400

    MINOR; Validate at least one control record (#15912)
    
    Validate that a control batch in the batch accumulator has at least one 
control record.
    
    Reviewers: Jun Rao <jun...@apache.org>, Chia-Ping Tsai <chia7...@apache.org>
---
 .../apache/kafka/common/record/MemoryRecords.java  | 12 ++++-----
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  8 +++---
 .../java/org/apache/kafka/raft/ReplicatedLog.java  |  2 +-
 .../kafka/raft/internals/BatchAccumulator.java     | 12 +++++----
 .../internals/KRaftControlRecordStateMachine.java  |  6 ++---
 .../org/apache/kafka/raft/internals/VoterSet.java  |  6 ++---
 .../kafka/raft/internals/VoterSetHistory.java      | 10 ++++----
 .../kafka/snapshot/RecordsSnapshotReader.java      |  4 +--
 .../kafka/raft/internals/BatchAccumulatorTest.java | 30 ++++++++++++++++++++++
 9 files changed, 61 insertions(+), 29 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 4eab7f7b658..5d6f9227875 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -730,7 +730,7 @@ public class MemoryRecords extends AbstractRecords {
         ByteBuffer buffer,
         LeaderChangeMessage leaderChangeMessage
     ) {
-        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+        try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
                 initialOffset,
                 timestamp,
                 leaderEpoch,
@@ -749,7 +749,7 @@ public class MemoryRecords extends AbstractRecords {
         ByteBuffer buffer,
         SnapshotHeaderRecord snapshotHeaderRecord
     ) {
-        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+        try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
                 initialOffset,
                 timestamp,
                 leaderEpoch,
@@ -768,7 +768,7 @@ public class MemoryRecords extends AbstractRecords {
         ByteBuffer buffer,
         SnapshotFooterRecord snapshotFooterRecord
     ) {
-        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+        try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
                 initialOffset,
                 timestamp,
                 leaderEpoch,
@@ -787,7 +787,7 @@ public class MemoryRecords extends AbstractRecords {
         ByteBuffer buffer,
         KRaftVersionRecord kraftVersionRecord
     ) {
-        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+        try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
                 initialOffset,
                 timestamp,
                 leaderEpoch,
@@ -806,7 +806,7 @@ public class MemoryRecords extends AbstractRecords {
         ByteBuffer buffer,
         VotersRecord votersRecord
     ) {
-        try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder(
+        try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder(
                 initialOffset,
                 timestamp,
                 leaderEpoch,
@@ -818,7 +818,7 @@ public class MemoryRecords extends AbstractRecords {
         }
     }
 
-    private static MemoryRecordsBuilder createKraftControlReccordBuilder(
+    private static MemoryRecordsBuilder createKraftControlRecordBuilder(
         long initialOffset,
         long timestamp,
         int leaderEpoch,
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 0662af6875c..70408c73bea 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -386,8 +386,9 @@ final public class KafkaRaftClient<T> implements 
RaftClient<T> {
         logger.info("Reading KRaft snapshot and log as part of the 
initialization");
         partitionState.updateState();
 
+        VoterSet lastVoterSet = partitionState.lastVoterSet();
         requestManager = new RequestManager(
-            partitionState.lastVoterSet().voterIds(),
+            lastVoterSet.voterIds(),
             quorumConfig.retryBackoffMs(),
             quorumConfig.requestTimeoutMs(),
             random
@@ -395,7 +396,7 @@ final public class KafkaRaftClient<T> implements 
RaftClient<T> {
 
         quorum = new QuorumState(
             nodeId,
-            partitionState.lastVoterSet().voterIds(),
+            lastVoterSet.voterIds(),
             quorumConfig.electionTimeoutMs(),
             quorumConfig.fetchTimeoutMs(),
             quorumStateStore,
@@ -409,7 +410,6 @@ final public class KafkaRaftClient<T> implements 
RaftClient<T> {
         // so there are no unknown voter connections. Report this metric as 0.
         kafkaRaftMetrics.updateNumUnknownVoterConnections(0);
 
-        VoterSet lastVoterSet = partitionState.lastVoterSet();
         for (Integer voterId : lastVoterSet.voterIds()) {
             channel.updateEndpoint(voterId, lastVoterSet.voterAddress(voterId, 
listenerName).get());
         }
@@ -1524,7 +1524,7 @@ final public class KafkaRaftClient<T> implements 
RaftClient<T> {
                     quorum.leaderIdOrSentinel()
                 );
 
-                // This will aways reload the snapshot because the internal 
next offset
+                // This will always reload the snapshot because the internal 
next offset
                 // is always less than the snapshot id just downloaded.
                 partitionState.updateState();
 
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java 
b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
index a8bd7f13d9a..fdc5c5fb0df 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java
@@ -236,7 +236,7 @@ public interface ReplicatedLog extends AutoCloseable {
      * {@link Optional#empty()}.
      *
      * The snapshot id will be validated against the existing snapshots and 
the log. The snapshot id
-     * must not alread exist, it must be greater than the log start offset, it 
must be less than
+     * must not already exist, it must be greater than the log start offset, 
it must be less than
      * the high-watermark and it must exist in the log.
      *
      * @param snapshotId the end offset and epoch that identifies the snapshot
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
index 1163d68b47f..f16ec2f2d0e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java
@@ -230,7 +230,7 @@ public class BatchAccumulator<T> implements Closeable {
                     forceDrain();
                     MemoryRecords memoryRecords = 
valueCreator.create(nextOffset, epoch, buffer);
 
-                    int numberOfRecords = 
validateMemoryRecordAndReturnCount(memoryRecords);
+                    int numberOfRecords = 
validateMemoryRecordsAndReturnCount(memoryRecords);
 
                     completed.add(
                         new CompletedBatch<>(
@@ -255,9 +255,9 @@ public class BatchAccumulator<T> implements Closeable {
         }
     }
 
-    private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) 
{
-        // Confirm that it is at most one batch and it is a control record
-        Iterator<MutableRecordBatch> batches = 
memoryRecord.batches().iterator();
+    private int validateMemoryRecordsAndReturnCount(MemoryRecords 
memoryRecords) {
+        // Confirm that it is one control batch and it is at least one control 
record
+        Iterator<MutableRecordBatch> batches = 
memoryRecords.batches().iterator();
         if (!batches.hasNext()) {
             throw new IllegalArgumentException("valueCreator didn't create a 
batch");
         }
@@ -265,7 +265,7 @@ public class BatchAccumulator<T> implements Closeable {
         MutableRecordBatch batch = batches.next();
         Integer numberOfRecords = batch.countOrNull();
         if (!batch.isControlBatch()) {
-            throw new IllegalArgumentException("valueCreator didn't creatte a 
control batch");
+            throw new IllegalArgumentException("valueCreator didn't create a 
control batch");
         } else if (batch.baseOffset() != nextOffset) {
             throw new IllegalArgumentException(
                 String.format(
@@ -284,6 +284,8 @@ public class BatchAccumulator<T> implements Closeable {
             );
         } else if (numberOfRecords == null) {
             throw new IllegalArgumentException("valueCreator didn't create a 
batch with the count");
+        } else if (numberOfRecords < 1) {
+            throw new IllegalArgumentException("valueCreator didn't create at 
least one control record");
         } else if (batches.hasNext()) {
             throw new IllegalArgumentException("valueCreator created more than 
one batch");
         }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
index 4dab03bf798..dd6e6a0cd39 100644
--- 
a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
+++ 
b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java
@@ -39,8 +39,8 @@ import org.slf4j.Logger;
  * This type keeps track of changes to the finalized kraft.version and the 
sets of voters between
  * the latest snasphot and the log end offset.
  *
- * The are two actors/threads for this type. One is the KRaft driver which 
indirectly call a lot of
- * the public methods. The other are the callers of {@code 
RaftClient::createSnapshot} which
+ * There are two type of actors/threads accessing this type. One is the KRaft 
driver which indirectly call a lot of
+ * the public methods. The other actors/threads are the callers of {@code 
RaftClient.createSnapshot} which
  * indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} 
when freezing a snapshot.
  */
 final public class KRaftControlRecordStateMachine {
@@ -115,7 +115,7 @@ final public class KRaftControlRecordStateMachine {
     /**
      * Remove the tail of the log until the given offset.
      *
-     * @param @startOffset the start offset (inclusive)
+     * @param startOffset the start offset (inclusive)
      */
     public void truncateOldEntries(long startOffset) {
         synchronized (voterSetHistory) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
index ae0ec4d4b66..9ca38368c03 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java
@@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.Utils;
  *
  * It encapsulates static information like a voter's endpoint and their 
supported kraft.version.
  *
- * It providees functionality for converting to and from {@code VotersRecord} 
and for converting
+ * It provides functionality for converting to and from {@code VotersRecord} 
and for converting
  * from the static configuration.
  */
 final public class VoterSet {
@@ -161,8 +161,8 @@ final public class VoterSet {
      * An overlapping majority means that for all majorities in {@code this} 
set of voters and for
      * all majority in {@code that} set of voters, they have at least one 
voter in common.
      *
-     * If this function returns true is means that one of the voter set 
commits an offset, it means
-     * that the other voter set cannot commit a conflicting offset.
+     * If this function returns true, it means that if one of the set of 
voters commits an offset,
+     * the other set of voters cannot commit a conflicting offset.
      *
      * @param that the other voter set to compare
      * @return true if they have an overlapping majority, false otherwise
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
index 3a15d62c5a9..fa44660af67 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java
@@ -21,9 +21,9 @@ import java.util.Optional;
 /**
  * A type for storing the historical value of the set of voters.
  *
- * This type can be use to keep track in-memory the sets for voters stored in 
the latest snapshot
- * and log. This is useful when generating a new snapshot at a given offset or 
when evaulating
- * the latest set of voters.
+ * This type can be used to keep track, in-memory, of the sets for voters 
stored in the latest snapshot
+ * and the log segments. This is useful when generating a new snapshot at a 
given offset or when
+ * evaluating the latest set of voters.
  */
 final public class VoterSetHistory {
     private final Optional<VoterSet> staticVoterSet;
@@ -40,7 +40,7 @@ final public class VoterSetHistory {
      * offset of all previous calls to this method.
      *
      * @param offset the offset
-     * @param value the value to store
+     * @param voters the voters to store
      * @throws IllegalArgumentException if the offset is not greater than all 
previous offsets
      */
     public void addAt(long offset, VoterSet voters) {
@@ -69,7 +69,7 @@ final public class VoterSetHistory {
      * Computes the value of the voter set at a given offset.
      *
      * This function will only return values provided through {@code addAt} 
and it would never
-     * include the {@code staticVoterSet} provided through the constructoer.
+     * include the {@code staticVoterSet} provided through the constructor.
      *
      * @param offset the offset (inclusive)
      * @return the voter set if one exist, otherwise {@code Optional.empty()}
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java 
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
index 3e6a9a5732d..20b6c0eaab8 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
@@ -120,10 +120,10 @@ public final class RecordsSnapshotReader<T> implements 
SnapshotReader<T> {
     }
 
     /**
-     * Returns the next non-control Batch
+     * Returns the next batch
      */
     private Optional<Batch<T>> nextBatch() {
-        while (iterator.hasNext()) {
+        if (iterator.hasNext()) {
             Batch<T> batch = iterator.next();
 
             if (!lastContainedLogTimestamp.isPresent()) {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
index cf63f4384f9..30f5691e2e2 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java
@@ -683,6 +683,36 @@ class BatchAccumulatorTest {
         }
     }
 
+    @Test
+    public void testEmptyControlBatch() {
+        int leaderEpoch = 17;
+        long baseOffset = 157;
+        int lingerMs = 50;
+        int maxBatchSize = 512;
+
+        ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize);
+        Mockito.when(memoryPool.tryAllocate(maxBatchSize))
+                .thenReturn(buffer);
+
+        BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, buf) 
-> {
+            long now = 1234;
+            try (MemoryRecordsBuilder builder = controlRecordsBuilder(offset, 
epoch, now, buf)) {
+                // Create a control batch without any records
+                return builder.build();
+            }
+        };
+
+        try (BatchAccumulator<String> acc = buildAccumulator(
+                leaderEpoch,
+                baseOffset,
+                lingerMs,
+                maxBatchSize
+            )
+        ) {
+            assertThrows(IllegalArgumentException.class, () -> 
acc.appendControlMessages(creator));
+        }
+    }
+
     private static MemoryRecordsBuilder controlRecordsBuilder(
         long baseOffset,
         int epoch,

Reply via email to