This is an automated email from the ASF dual-hosted git repository. jgus pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.3 by this push: new 2dafe45f15e KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457) 2dafe45f15e is described below commit 2dafe45f15eb730a7a6bad258defd298ec173a08 Author: Niket <niket-g...@users.noreply.github.com> AuthorDate: Mon Aug 8 15:03:04 2022 -0700 KAFKA-14104; Add CRC validation when iterating over Metadata Log Records (#12457) This commit adds a check to ensure the RecordBatch CRC is valid when iterating over a Batch of Records using the RecordsIterator. The RecordsIterator is used by both Snapshot reads and Log Records reads in Kraft. The check can be turned off by a class parameter and is on by default. Reviewers: José Armando García Sancio <jsan...@users.noreply.github.com> --- .../kafka/common/record/DefaultRecordBatch.java | 2 +- .../kafka/server/RaftClusterSnapshotTest.scala | 3 +- .../kafka/controller/QuorumControllerTest.java | 3 +- .../org/apache/kafka/metalog/LocalLogManager.java | 3 +- .../org/apache/kafka/raft/KafkaRaftClient.java | 10 +++- .../kafka/raft/internals/RecordsBatchReader.java | 5 +- .../kafka/raft/internals/RecordsIterator.java | 13 ++++- .../kafka/snapshot/RecordsSnapshotReader.java | 5 +- .../apache/kafka/raft/RaftEventSimulationTest.java | 2 +- .../raft/internals/RecordsBatchReaderTest.java | 3 +- .../kafka/raft/internals/RecordsIteratorTest.java | 61 +++++++++++++++++++--- .../kafka/snapshot/SnapshotWriterReaderTest.java | 5 +- 12 files changed, 92 insertions(+), 23 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java index bd80981d84b..bc8f32491c0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java +++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java @@ -107,7 +107,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe static final int PARTITION_LEADER_EPOCH_LENGTH = 4; static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + PARTITION_LEADER_EPOCH_LENGTH; static final int MAGIC_LENGTH = 1; - static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; + public static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH; static final int CRC_LENGTH = 4; static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH; static final int ATTRIBUTE_LENGTH = 2; diff --git a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala index 503ce7d2bee..f8dccd17d0d 100644 --- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala +++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala @@ -78,7 +78,8 @@ class RaftClusterSnapshotTest { raftManager.replicatedLog.latestSnapshot.get(), new MetadataRecordSerde(), BufferSupplier.create(), - 1 + 1, + true ) ) { snapshot => // Check that the snapshot is non-empty diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java index a62b1f682f0..2cdec699da2 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java @@ -738,7 +738,8 @@ public class QuorumControllerTest { reader, new MetadataRecordSerde(), BufferSupplier.create(), - Integer.MAX_VALUE + Integer.MAX_VALUE, + true ); } diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java index c8e39ae3289..e24d86bd873 100644 --- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java +++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java @@ -496,7 +496,8 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>, snapshot.get(), new MetadataRecordSerde(), BufferSupplier.create(), - Integer.MAX_VALUE + Integer.MAX_VALUE, + true ) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java index 53372728aab..cac7a8a3cb9 100644 --- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java +++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java @@ -333,7 +333,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> { private Optional<SnapshotReader<T>> latestSnapshot() { return log.latestSnapshot().map(reader -> - RecordsSnapshotReader.of(reader, serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES) + RecordsSnapshotReader.of(reader, + serde, + BufferSupplier.create(), + MAX_BATCH_SIZE_BYTES, + true /* Validate batch CRC*/ + ) ); } @@ -2519,7 +2524,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> { serde, BufferSupplier.create(), MAX_BATCH_SIZE_BYTES, - this + this, + true /* Validate batch CRC*/ ) ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java index e95206100a3..61819a9dcca 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java @@ -100,11 +100,12 @@ public final class RecordsBatchReader<T> implements BatchReader<T> { RecordSerde<T> serde, BufferSupplier bufferSupplier, int maxBatchSize, - CloseListener<BatchReader<T>> closeListener + CloseListener<BatchReader<T>> closeListener, + boolean doCrcValidation ) { return new RecordsBatchReader<>( baseOffset, - new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize), + new RecordsIterator<>(records, serde, bufferSupplier, maxBatchSize, doCrcValidation), closeListener ); } diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java index 866f541fb24..ff415aa72ad 100644 --- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java +++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java @@ -41,6 +41,9 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab private final RecordSerde<T> serde; private final BufferSupplier bufferSupplier; private final int batchSize; + // Setting to true will make the RecordsIterator perform a CRC Validation + // on the batch header when iterating over them + private final boolean doCrcValidation; private Iterator<MutableRecordBatch> nextBatches = Collections.emptyIterator(); private Optional<Batch<T>> nextBatch = Optional.empty(); @@ -54,12 +57,14 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab Records records, RecordSerde<T> serde, BufferSupplier bufferSupplier, - int batchSize + int batchSize, + boolean doCrcValidation ) { this.records = records; this.serde = serde; this.bufferSupplier = bufferSupplier; this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC); + this.doCrcValidation = doCrcValidation; } @Override @@ -163,7 +168,6 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab if (nextBatches.hasNext()) { MutableRecordBatch nextBatch = nextBatches.next(); - // Update the buffer position to reflect the read batch allocatedBuffer.ifPresent(buffer -> buffer.position(buffer.position() + nextBatch.sizeInBytes())); @@ -180,6 +184,11 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab } private Batch<T> readBatch(DefaultRecordBatch batch) { + if (doCrcValidation) { + // Perform a CRC validity check on this batch + batch.ensureValid(); + } + final Batch<T> result; if (batch.isControlBatch()) { result = Batch.control( diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java index 89ad2632229..92b695146c3 100644 --- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java +++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java @@ -104,11 +104,12 @@ public final class RecordsSnapshotReader<T> implements SnapshotReader<T> { RawSnapshotReader snapshot, RecordSerde<T> serde, BufferSupplier bufferSupplier, - int maxBatchSize + int maxBatchSize, + boolean doCrcValidation ) { return new RecordsSnapshotReader<>( snapshot.snapshotId(), - new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize) + new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, maxBatchSize, doCrcValidation) ); } diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java index 4f79dc18cc6..a6117a33ca0 100644 --- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java @@ -1112,7 +1112,7 @@ public class RaftEventSimulationTest { startOffset.set(snapshotId.offset); try (SnapshotReader<Integer> snapshot = - RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE)) { + RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, BufferSupplier.create(), Integer.MAX_VALUE, true)) { // Expect only one batch with only one record assertTrue(snapshot.hasNext()); Batch<Integer> batch = snapshot.next(); diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java index 6fe540711c2..ae8b1dfb8e2 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java @@ -100,7 +100,8 @@ class RecordsBatchReaderTest { serde, bufferSupplier, MAX_BATCH_BYTES, - closeListener + closeListener, + true ); for (TestBatch<String> batch : expectedBatches) { diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java index 7d984893120..9dfbfd62fbf 100644 --- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java +++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java @@ -30,7 +30,9 @@ import java.util.stream.Collectors; import java.util.stream.Stream; import net.jqwik.api.ForAll; import net.jqwik.api.Property; +import org.apache.kafka.common.errors.CorruptRecordException; import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.DefaultRecordBatch; import org.apache.kafka.common.record.FileRecords; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.Records; @@ -42,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -60,7 +63,7 @@ public final class RecordsIteratorTest { @ParameterizedTest @MethodSource("emptyRecords") void testEmptyRecords(Records records) { - testIterator(Collections.emptyList(), records); + testIterator(Collections.emptyList(), records, true); } @Property @@ -71,7 +74,7 @@ public final class RecordsIteratorTest { List<TestBatch<String>> batches = createBatches(seed); MemoryRecords memRecords = buildRecords(compressionType, batches); - testIterator(batches, memRecords); + testIterator(batches, memRecords, true); } @Property @@ -85,18 +88,58 @@ public final class RecordsIteratorTest { FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); fileRecords.append(memRecords); - testIterator(batches, fileRecords); + testIterator(batches, fileRecords, true); + fileRecords.close(); + } + + @Property + public void testCrcValidation( + @ForAll CompressionType compressionType, + @ForAll long seed + ) throws IOException { + List<TestBatch<String>> batches = createBatches(seed); + MemoryRecords memRecords = buildRecords(compressionType, batches); + // Read the Batch CRC for the first batch from the buffer + ByteBuffer readBuf = memRecords.buffer(); + readBuf.position(DefaultRecordBatch.CRC_OFFSET); + int actualCrc = readBuf.getInt(); + // Corrupt the CRC on the first batch + memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc + 1); + + assertThrows(CorruptRecordException.class, () -> testIterator(batches, memRecords, true)); + + FileRecords fileRecords = FileRecords.open(TestUtils.tempFile()); + fileRecords.append(memRecords); + assertThrows(CorruptRecordException.class, () -> testIterator(batches, fileRecords, true)); + + // Verify check does not trigger when doCrcValidation is false + assertDoesNotThrow(() -> testIterator(batches, memRecords, false)); + assertDoesNotThrow(() -> testIterator(batches, fileRecords, false)); + + // Fix the corruption + memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc); + + // Verify check does not trigger when the corruption is fixed + assertDoesNotThrow(() -> testIterator(batches, memRecords, true)); + FileRecords moreFileRecords = FileRecords.open(TestUtils.tempFile()); + moreFileRecords.append(memRecords); + assertDoesNotThrow(() -> testIterator(batches, moreFileRecords, true)); + + fileRecords.close(); + moreFileRecords.close(); } private void testIterator( List<TestBatch<String>> expectedBatches, - Records records + Records records, + boolean validateCrc ) { Set<ByteBuffer> allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>()); RecordsIterator<String> iterator = createIterator( records, - mockBufferSupplier(allocatedBuffers) + mockBufferSupplier(allocatedBuffers), + validateCrc ); for (TestBatch<String> batch : expectedBatches) { @@ -111,8 +154,12 @@ public final class RecordsIteratorTest { assertEquals(Collections.emptySet(), allocatedBuffers); } - static RecordsIterator<String> createIterator(Records records, BufferSupplier bufferSupplier) { - return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC); + static RecordsIterator<String> createIterator( + Records records, + BufferSupplier bufferSupplier, + boolean validateCrc + ) { + return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, Records.HEADER_SIZE_UP_TO_MAGIC, validateCrc); } static BufferSupplier mockBufferSupplier(Set<ByteBuffer> buffers) { diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java index 05d1929f271..cd86c709ff9 100644 --- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java +++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java @@ -192,7 +192,8 @@ final public class SnapshotWriterReaderTest { context.log.readSnapshot(snapshotId).get(), context.serde, BufferSupplier.create(), - maxBatchSize + maxBatchSize, + true ); } @@ -246,7 +247,7 @@ final public class SnapshotWriterReaderTest { public static void assertSnapshot(List<List<String>> batches, RawSnapshotReader reader) { assertSnapshot( batches, - RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE) + RecordsSnapshotReader.of(reader, new StringSerde(), BufferSupplier.create(), Integer.MAX_VALUE, true) ); }