This is an automated email from the ASF dual-hosted git repository. jsancio pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 440f5f6c097 MINOR; Validate at least one control record (#15912) 440f5f6c097 is described below commit 440f5f6c09720bb9414524781342bbf35973c281 Author: José Armando García Sancio <jsan...@users.noreply.github.com> AuthorDate: Tue May 14 10:02:29 2024 -0400 MINOR; Validate at least one control record (#15912) Validate that a control batch in the batch accumulator has at least one control record. Reviewers: Jun Rao <jun...@apache.org>, Chia-Ping Tsai <chia7...@apache.org> --- .../apache/kafka/common/record/MemoryRecords.java | 12 ++++----- .../org/apache/kafka/raft/KafkaRaftClient.java | 8 +++--- .../java/org/apache/kafka/raft/ReplicatedLog.java | 2 +- .../kafka/raft/internals/BatchAccumulator.java | 12 +++++---- .../internals/KRaftControlRecordStateMachine.java | 6 ++--- .../org/apache/kafka/raft/internals/VoterSet.java | 6 ++--- .../kafka/raft/internals/VoterSetHistory.java | 10 ++++---- .../kafka/snapshot/RecordsSnapshotReader.java | 4 +-- .../kafka/raft/internals/BatchAccumulatorTest.java | 30 ++++++++++++++++++++++ 9 files changed, 61 insertions(+), 29 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 4eab7f7b658..5d6f9227875 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -730,7 +730,7 @@ public class MemoryRecords extends AbstractRecords { ByteBuffer buffer, LeaderChangeMessage leaderChangeMessage ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -749,7 +749,7 @@ public class MemoryRecords extends AbstractRecords { ByteBuffer buffer, SnapshotHeaderRecord snapshotHeaderRecord ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -768,7 +768,7 @@ public class MemoryRecords extends AbstractRecords { ByteBuffer buffer, SnapshotFooterRecord snapshotFooterRecord ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -787,7 +787,7 @@ public class MemoryRecords extends AbstractRecords { ByteBuffer buffer, KRaftVersionRecord kraftVersionRecord ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -806,7 +806,7 @@ public class MemoryRecords extends AbstractRecords { ByteBuffer buffer, VotersRecord votersRecord ) { - try (MemoryRecordsBuilder builder = createKraftControlReccordBuilder( + try (MemoryRecordsBuilder builder = createKraftControlRecordBuilder( initialOffset, timestamp, leaderEpoch, @@ -818,7 +818,7 @@ public class MemoryRecords extends AbstractRecords { } } - private static MemoryRecordsBuilder createKraftControlReccordBuilder( + private static MemoryRecordsBuilder createKraftControlRecordBuilder( long initialOffset, long timestamp, int leaderEpoch, diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 0662af6875c..70408c73bea 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -386,8 +386,9 @@ final public class KafkaRaftClient<T> implements RaftClient<T> { logger.info("Reading KRaft snapshot and log as part of the initialization"); partitionState.updateState(); + VoterSet lastVoterSet = partitionState.lastVoterSet(); requestManager = new RequestManager( - partitionState.lastVoterSet().voterIds(), + lastVoterSet.voterIds(), quorumConfig.retryBackoffMs(), quorumConfig.requestTimeoutMs(), random @@ -395,7 +396,7 @@ final public class KafkaRaftClient<T> implements RaftClient<T> { quorum = new QuorumState( nodeId, - partitionState.lastVoterSet().voterIds(), + lastVoterSet.voterIds(), quorumConfig.electionTimeoutMs(), quorumConfig.fetchTimeoutMs(), quorumStateStore, @@ -409,7 +410,6 @@ final public class KafkaRaftClient<T> implements RaftClient<T> { // so there are no unknown voter connections. Report this metric as 0. kafkaRaftMetrics.updateNumUnknownVoterConnections(0); - VoterSet lastVoterSet = partitionState.lastVoterSet(); for (Integer voterId : lastVoterSet.voterIds()) { channel.updateEndpoint(voterId, lastVoterSet.voterAddress(voterId, listenerName).get()); } @@ -1524,7 +1524,7 @@ final public class KafkaRaftClient<T> implements RaftClient<T> { quorum.leaderIdOrSentinel() ); - // This will aways reload the snapshot because the internal next offset + // This will always reload the snapshot because the internal next offset // is always less than the snapshot id just downloaded. partitionState.updateState(); diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java index a8bd7f13d9a..fdc5c5fb0df 100644 --- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java +++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedLog.java @@ -236,7 +236,7 @@ public interface ReplicatedLog extends AutoCloseable { * {@link Optional#empty()}. * * The snapshot id will be validated against the existing snapshots and the log. The snapshot id - * must not alread exist, it must be greater than the log start offset, it must be less than + * must not already exist, it must be greater than the log start offset, it must be less than * the high-watermark and it must exist in the log. * * @param snapshotId the end offset and epoch that identifies the snapshot diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java index 1163d68b47f..f16ec2f2d0e 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/BatchAccumulator.java @@ -230,7 +230,7 @@ public class BatchAccumulator<T> implements Closeable { forceDrain(); MemoryRecords memoryRecords = valueCreator.create(nextOffset, epoch, buffer); - int numberOfRecords = validateMemoryRecordAndReturnCount(memoryRecords); + int numberOfRecords = validateMemoryRecordsAndReturnCount(memoryRecords); completed.add( new CompletedBatch<>( @@ -255,9 +255,9 @@ public class BatchAccumulator<T> implements Closeable { } } - private int validateMemoryRecordAndReturnCount(MemoryRecords memoryRecord) { - // Confirm that it is at most one batch and it is a control record - Iterator<MutableRecordBatch> batches = memoryRecord.batches().iterator(); + private int validateMemoryRecordsAndReturnCount(MemoryRecords memoryRecords) { + // Confirm that it is one control batch and it is at least one control record + Iterator<MutableRecordBatch> batches = memoryRecords.batches().iterator(); if (!batches.hasNext()) { throw new IllegalArgumentException("valueCreator didn't create a batch"); } @@ -265,7 +265,7 @@ public class BatchAccumulator<T> implements Closeable { MutableRecordBatch batch = batches.next(); Integer numberOfRecords = batch.countOrNull(); if (!batch.isControlBatch()) { - throw new IllegalArgumentException("valueCreator didn't creatte a control batch"); + throw new IllegalArgumentException("valueCreator didn't create a control batch"); } else if (batch.baseOffset() != nextOffset) { throw new IllegalArgumentException( String.format( @@ -284,6 +284,8 @@ public class BatchAccumulator<T> implements Closeable { ); } else if (numberOfRecords == null) { throw new IllegalArgumentException("valueCreator didn't create a batch with the count"); + } else if (numberOfRecords < 1) { + throw new IllegalArgumentException("valueCreator didn't create at least one control record"); } else if (batches.hasNext()) { throw new IllegalArgumentException("valueCreator created more than one batch"); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java index 4dab03bf798..dd6e6a0cd39 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/KRaftControlRecordStateMachine.java @@ -39,8 +39,8 @@ import org.slf4j.Logger; * This type keeps track of changes to the finalized kraft.version and the sets of voters between * the latest snasphot and the log end offset. * - * The are two actors/threads for this type. One is the KRaft driver which indirectly call a lot of - * the public methods. The other are the callers of {@code RaftClient::createSnapshot} which + * There are two type of actors/threads accessing this type. One is the KRaft driver which indirectly call a lot of + * the public methods. The other actors/threads are the callers of {@code RaftClient.createSnapshot} which * indirectly call {@code voterSetAtOffset} and {@code kraftVersionAtOffset} when freezing a snapshot. */ final public class KRaftControlRecordStateMachine { @@ -115,7 +115,7 @@ final public class KRaftControlRecordStateMachine { /** * Remove the tail of the log until the given offset. * - * @param @startOffset the start offset (inclusive) + * @param startOffset the start offset (inclusive) */ public void truncateOldEntries(long startOffset) { synchronized (voterSetHistory) { diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java index ae0ec4d4b66..9ca38368c03 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSet.java @@ -38,7 +38,7 @@ import org.apache.kafka.common.utils.Utils; * * It encapsulates static information like a voter's endpoint and their supported kraft.version. * - * It providees functionality for converting to and from {@code VotersRecord} and for converting + * It provides functionality for converting to and from {@code VotersRecord} and for converting * from the static configuration. */ final public class VoterSet { @@ -161,8 +161,8 @@ final public class VoterSet { * An overlapping majority means that for all majorities in {@code this} set of voters and for * all majority in {@code that} set of voters, they have at least one voter in common. * - * If this function returns true is means that one of the voter set commits an offset, it means - * that the other voter set cannot commit a conflicting offset. + * If this function returns true, it means that if one of the set of voters commits an offset, + * the other set of voters cannot commit a conflicting offset. * * @param that the other voter set to compare * @return true if they have an overlapping majority, false otherwise diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java index 3a15d62c5a9..fa44660af67 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/VoterSetHistory.java @@ -21,9 +21,9 @@ import java.util.Optional; /** * A type for storing the historical value of the set of voters. * - * This type can be use to keep track in-memory the sets for voters stored in the latest snapshot - * and log. This is useful when generating a new snapshot at a given offset or when evaulating - * the latest set of voters. + * This type can be used to keep track, in-memory, of the sets for voters stored in the latest snapshot + * and the log segments. This is useful when generating a new snapshot at a given offset or when + * evaluating the latest set of voters. */ final public class VoterSetHistory { private final Optional<VoterSet> staticVoterSet; @@ -40,7 +40,7 @@ final public class VoterSetHistory { * offset of all previous calls to this method. * * @param offset the offset - * @param value the value to store + * @param voters the voters to store * @throws IllegalArgumentException if the offset is not greater than all previous offsets */ public void addAt(long offset, VoterSet voters) { @@ -69,7 +69,7 @@ final public class VoterSetHistory { * Computes the value of the voter set at a given offset. * * This function will only return values provided through {@code addAt} and it would never - * include the {@code staticVoterSet} provided through the constructoer. + * include the {@code staticVoterSet} provided through the constructor. * * @param offset the offset (inclusive) * @return the voter set if one exist, otherwise {@code Optional.empty()} diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java index 3e6a9a5732d..20b6c0eaab8 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java @@ -120,10 +120,10 @@ public final class RecordsSnapshotReader<T> implements SnapshotReader<T> { } /** - * Returns the next non-control Batch + * Returns the next batch */ private Optional<Batch<T>> nextBatch() { - while (iterator.hasNext()) { + if (iterator.hasNext()) { Batch<T> batch = iterator.next(); if (!lastContainedLogTimestamp.isPresent()) { diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java index cf63f4384f9..30f5691e2e2 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/BatchAccumulatorTest.java @@ -683,6 +683,36 @@ class BatchAccumulatorTest { } } + @Test + public void testEmptyControlBatch() { + int leaderEpoch = 17; + long baseOffset = 157; + int lingerMs = 50; + int maxBatchSize = 512; + + ByteBuffer buffer = ByteBuffer.allocate(maxBatchSize); + Mockito.when(memoryPool.tryAllocate(maxBatchSize)) + .thenReturn(buffer); + + BatchAccumulator.MemoryRecordsCreator creator = (offset, epoch, buf) -> { + long now = 1234; + try (MemoryRecordsBuilder builder = controlRecordsBuilder(offset, epoch, now, buf)) { + // Create a control batch without any records + return builder.build(); + } + }; + + try (BatchAccumulator<String> acc = buildAccumulator( + leaderEpoch, + baseOffset, + lingerMs, + maxBatchSize + ) + ) { + assertThrows(IllegalArgumentException.class, () -> acc.appendControlMessages(creator)); + } + } + private static MemoryRecordsBuilder controlRecordsBuilder( long baseOffset, int epoch,