This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 2dafe45f15e KAFKA-14104; Add CRC validation when iterating over 
Metadata Log Records (#12457)
2dafe45f15e is described below

commit 2dafe45f15eb730a7a6bad258defd298ec173a08
Author: Niket <niket-g...@users.noreply.github.com>
AuthorDate: Mon Aug 8 15:03:04 2022 -0700

    KAFKA-14104; Add CRC validation when iterating over Metadata Log Records 
(#12457)
    
    This commit adds a check to ensure the RecordBatch CRC is valid when
    iterating over a Batch of Records using the RecordsIterator. The
    RecordsIterator is used by both Snapshot reads and Log Records reads in
    Kraft. The check can be turned off by a class parameter and is on by 
default.
    
    Reviewers: José Armando García Sancio <jsan...@users.noreply.github.com>
---
 .../kafka/common/record/DefaultRecordBatch.java    |  2 +-
 .../kafka/server/RaftClusterSnapshotTest.scala     |  3 +-
 .../kafka/controller/QuorumControllerTest.java     |  3 +-
 .../org/apache/kafka/metalog/LocalLogManager.java  |  3 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     | 10 +++-
 .../kafka/raft/internals/RecordsBatchReader.java   |  5 +-
 .../kafka/raft/internals/RecordsIterator.java      | 13 ++++-
 .../kafka/snapshot/RecordsSnapshotReader.java      |  5 +-
 .../apache/kafka/raft/RaftEventSimulationTest.java |  2 +-
 .../raft/internals/RecordsBatchReaderTest.java     |  3 +-
 .../kafka/raft/internals/RecordsIteratorTest.java  | 61 +++++++++++++++++++---
 .../kafka/snapshot/SnapshotWriterReaderTest.java   |  5 +-
 12 files changed, 92 insertions(+), 23 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index bd80981d84b..bc8f32491c0 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -107,7 +107,7 @@ public class DefaultRecordBatch extends AbstractRecordBatch 
implements MutableRe
     static final int PARTITION_LEADER_EPOCH_LENGTH = 4;
     static final int MAGIC_OFFSET = PARTITION_LEADER_EPOCH_OFFSET + 
PARTITION_LEADER_EPOCH_LENGTH;
     static final int MAGIC_LENGTH = 1;
-    static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
+    public static final int CRC_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
     static final int CRC_LENGTH = 4;
     static final int ATTRIBUTES_OFFSET = CRC_OFFSET + CRC_LENGTH;
     static final int ATTRIBUTE_LENGTH = 2;
diff --git 
a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala 
b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
index 503ce7d2bee..f8dccd17d0d 100644
--- a/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
+++ b/core/src/test/scala/integration/kafka/server/RaftClusterSnapshotTest.scala
@@ -78,7 +78,8 @@ class RaftClusterSnapshotTest {
             raftManager.replicatedLog.latestSnapshot.get(),
             new MetadataRecordSerde(),
             BufferSupplier.create(),
-            1
+            1,
+            true
           )
         ) { snapshot =>
           // Check that the snapshot is non-empty
diff --git 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index a62b1f682f0..2cdec699da2 100644
--- 
a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -738,7 +738,8 @@ public class QuorumControllerTest {
             reader,
             new MetadataRecordSerde(),
             BufferSupplier.create(),
-            Integer.MAX_VALUE
+            Integer.MAX_VALUE,
+            true
         );
     }
 
diff --git 
a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java 
b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index c8e39ae3289..e24d86bd873 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -496,7 +496,8 @@ public final class LocalLogManager implements 
RaftClient<ApiMessageAndVersion>,
                                         snapshot.get(),
                                         new  MetadataRecordSerde(),
                                         BufferSupplier.create(),
-                                        Integer.MAX_VALUE
+                                        Integer.MAX_VALUE,
+                                        true
                                     )
                                 );
                             }
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java 
b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 53372728aab..cac7a8a3cb9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -333,7 +333,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
     private Optional<SnapshotReader<T>> latestSnapshot() {
         return log.latestSnapshot().map(reader ->
-            RecordsSnapshotReader.of(reader, serde, BufferSupplier.create(), 
MAX_BATCH_SIZE_BYTES)
+            RecordsSnapshotReader.of(reader,
+                serde,
+                BufferSupplier.create(),
+                MAX_BATCH_SIZE_BYTES,
+                true /* Validate batch CRC*/
+            )
         );
     }
 
@@ -2519,7 +2524,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
                     serde,
                     BufferSupplier.create(),
                     MAX_BATCH_SIZE_BYTES,
-                    this
+                    this,
+                    true /* Validate batch CRC*/
                 )
             );
         }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
index e95206100a3..61819a9dcca 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsBatchReader.java
@@ -100,11 +100,12 @@ public final class RecordsBatchReader<T> implements 
BatchReader<T> {
         RecordSerde<T> serde,
         BufferSupplier bufferSupplier,
         int maxBatchSize,
-        CloseListener<BatchReader<T>> closeListener
+        CloseListener<BatchReader<T>> closeListener,
+        boolean doCrcValidation
     ) {
         return new RecordsBatchReader<>(
             baseOffset,
-            new RecordsIterator<>(records, serde, bufferSupplier, 
maxBatchSize),
+            new RecordsIterator<>(records, serde, bufferSupplier, 
maxBatchSize, doCrcValidation),
             closeListener
         );
     }
diff --git 
a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java 
b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index 866f541fb24..ff415aa72ad 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -41,6 +41,9 @@ public final class RecordsIterator<T> implements 
Iterator<Batch<T>>, AutoCloseab
     private final RecordSerde<T> serde;
     private final BufferSupplier bufferSupplier;
     private final int batchSize;
+    // Setting to true will make the RecordsIterator perform a CRC Validation
+    // on the batch header when iterating over them
+    private final boolean doCrcValidation;
 
     private Iterator<MutableRecordBatch> nextBatches = 
Collections.emptyIterator();
     private Optional<Batch<T>> nextBatch = Optional.empty();
@@ -54,12 +57,14 @@ public final class RecordsIterator<T> implements 
Iterator<Batch<T>>, AutoCloseab
         Records records,
         RecordSerde<T> serde,
         BufferSupplier bufferSupplier,
-        int batchSize
+        int batchSize,
+        boolean doCrcValidation
     ) {
         this.records = records;
         this.serde = serde;
         this.bufferSupplier = bufferSupplier;
         this.batchSize = Math.max(batchSize, Records.HEADER_SIZE_UP_TO_MAGIC);
+        this.doCrcValidation = doCrcValidation;
     }
 
     @Override
@@ -163,7 +168,6 @@ public final class RecordsIterator<T> implements 
Iterator<Batch<T>>, AutoCloseab
 
         if (nextBatches.hasNext()) {
             MutableRecordBatch nextBatch = nextBatches.next();
-
             // Update the buffer position to reflect the read batch
             allocatedBuffer.ifPresent(buffer -> 
buffer.position(buffer.position() + nextBatch.sizeInBytes()));
 
@@ -180,6 +184,11 @@ public final class RecordsIterator<T> implements 
Iterator<Batch<T>>, AutoCloseab
     }
 
     private Batch<T> readBatch(DefaultRecordBatch batch) {
+        if (doCrcValidation) {
+            // Perform a CRC validity check on this batch
+            batch.ensureValid();
+        }
+
         final Batch<T> result;
         if (batch.isControlBatch()) {
             result = Batch.control(
diff --git 
a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java 
b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
index 89ad2632229..92b695146c3 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
@@ -104,11 +104,12 @@ public final class RecordsSnapshotReader<T> implements 
SnapshotReader<T> {
         RawSnapshotReader snapshot,
         RecordSerde<T> serde,
         BufferSupplier bufferSupplier,
-        int maxBatchSize
+        int maxBatchSize,
+        boolean doCrcValidation
     ) {
         return new RecordsSnapshotReader<>(
             snapshot.snapshotId(),
-            new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, 
maxBatchSize)
+            new RecordsIterator<>(snapshot.records(), serde, bufferSupplier, 
maxBatchSize, doCrcValidation)
         );
     }
 
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java 
b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
index 4f79dc18cc6..a6117a33ca0 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftEventSimulationTest.java
@@ -1112,7 +1112,7 @@ public class RaftEventSimulationTest {
                 startOffset.set(snapshotId.offset);
 
                 try (SnapshotReader<Integer> snapshot =
-                        
RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, 
BufferSupplier.create(), Integer.MAX_VALUE)) {
+                        
RecordsSnapshotReader.of(log.readSnapshot(snapshotId).get(), node.intSerde, 
BufferSupplier.create(), Integer.MAX_VALUE, true)) {
                     // Expect only one batch with only one record
                     assertTrue(snapshot.hasNext());
                     Batch<Integer> batch = snapshot.next();
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
index 6fe540711c2..ae8b1dfb8e2 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsBatchReaderTest.java
@@ -100,7 +100,8 @@ class RecordsBatchReaderTest {
             serde,
             bufferSupplier,
             MAX_BATCH_BYTES,
-            closeListener
+            closeListener,
+            true
         );
 
         for (TestBatch<String> batch : expectedBatches) {
diff --git 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 7d984893120..9dfbfd62fbf 100644
--- 
a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++ 
b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -30,7 +30,9 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import net.jqwik.api.ForAll;
 import net.jqwik.api.Property;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
@@ -42,6 +44,7 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -60,7 +63,7 @@ public final class RecordsIteratorTest {
     @ParameterizedTest
     @MethodSource("emptyRecords")
     void testEmptyRecords(Records records) {
-        testIterator(Collections.emptyList(), records);
+        testIterator(Collections.emptyList(), records, true);
     }
 
     @Property
@@ -71,7 +74,7 @@ public final class RecordsIteratorTest {
         List<TestBatch<String>> batches = createBatches(seed);
 
         MemoryRecords memRecords = buildRecords(compressionType, batches);
-        testIterator(batches, memRecords);
+        testIterator(batches, memRecords, true);
     }
 
     @Property
@@ -85,18 +88,58 @@ public final class RecordsIteratorTest {
         FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
         fileRecords.append(memRecords);
 
-        testIterator(batches, fileRecords);
+        testIterator(batches, fileRecords, true);
+        fileRecords.close();
+    }
+
+    @Property
+    public void testCrcValidation(
+            @ForAll CompressionType compressionType,
+            @ForAll long seed
+    ) throws IOException {
+        List<TestBatch<String>> batches = createBatches(seed);
+        MemoryRecords memRecords = buildRecords(compressionType, batches);
+        // Read the Batch CRC for the first batch from the buffer
+        ByteBuffer readBuf = memRecords.buffer();
+        readBuf.position(DefaultRecordBatch.CRC_OFFSET);
+        int actualCrc = readBuf.getInt();
+        // Corrupt the CRC on the first batch
+        memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc + 
1);
+
+        assertThrows(CorruptRecordException.class, () -> testIterator(batches, 
memRecords, true));
+
+        FileRecords fileRecords = FileRecords.open(TestUtils.tempFile());
+        fileRecords.append(memRecords);
+        assertThrows(CorruptRecordException.class, () -> testIterator(batches, 
fileRecords, true));
+
+        // Verify check does not trigger when doCrcValidation is false
+        assertDoesNotThrow(() -> testIterator(batches, memRecords, false));
+        assertDoesNotThrow(() -> testIterator(batches, fileRecords, false));
+
+        // Fix the corruption
+        memRecords.buffer().putInt(DefaultRecordBatch.CRC_OFFSET, actualCrc);
+
+        // Verify check does not trigger when the corruption is fixed
+        assertDoesNotThrow(() -> testIterator(batches, memRecords, true));
+        FileRecords moreFileRecords = FileRecords.open(TestUtils.tempFile());
+        moreFileRecords.append(memRecords);
+        assertDoesNotThrow(() -> testIterator(batches, moreFileRecords, true));
+
+        fileRecords.close();
+        moreFileRecords.close();
     }
 
     private void testIterator(
         List<TestBatch<String>> expectedBatches,
-        Records records
+        Records records,
+        boolean validateCrc
     ) {
         Set<ByteBuffer> allocatedBuffers = Collections.newSetFromMap(new 
IdentityHashMap<>());
 
         RecordsIterator<String> iterator = createIterator(
             records,
-            mockBufferSupplier(allocatedBuffers)
+            mockBufferSupplier(allocatedBuffers),
+            validateCrc
         );
 
         for (TestBatch<String> batch : expectedBatches) {
@@ -111,8 +154,12 @@ public final class RecordsIteratorTest {
         assertEquals(Collections.emptySet(), allocatedBuffers);
     }
 
-    static RecordsIterator<String> createIterator(Records records, 
BufferSupplier bufferSupplier) {
-        return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, 
Records.HEADER_SIZE_UP_TO_MAGIC);
+    static RecordsIterator<String> createIterator(
+        Records records,
+        BufferSupplier bufferSupplier,
+        boolean validateCrc
+    ) {
+        return new RecordsIterator<>(records, STRING_SERDE, bufferSupplier, 
Records.HEADER_SIZE_UP_TO_MAGIC, validateCrc);
     }
 
     static BufferSupplier mockBufferSupplier(Set<ByteBuffer> buffers) {
diff --git 
a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java 
b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index 05d1929f271..cd86c709ff9 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -192,7 +192,8 @@ final public class SnapshotWriterReaderTest {
             context.log.readSnapshot(snapshotId).get(),
             context.serde,
             BufferSupplier.create(),
-            maxBatchSize
+            maxBatchSize,
+            true
         );
     }
 
@@ -246,7 +247,7 @@ final public class SnapshotWriterReaderTest {
     public static void assertSnapshot(List<List<String>> batches, 
RawSnapshotReader reader) {
         assertSnapshot(
             batches,
-            RecordsSnapshotReader.of(reader, new StringSerde(), 
BufferSupplier.create(), Integer.MAX_VALUE)
+            RecordsSnapshotReader.of(reader, new StringSerde(), 
BufferSupplier.create(), Integer.MAX_VALUE, true)
         );
     }
 

Reply via email to