This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6aa702fb248 MINOR: Improve CRC failure handling share groups (#21569)
6aa702fb248 is described below

commit 6aa702fb248b40834d6c95f7a192d8f3c3130f5e
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Mar 2 22:52:16 2026 +0000

    MINOR: Improve CRC failure handling share groups (#21569)
    
    The handling of CRC failures in CompletedShareFetch was incorrect,
    particularly when multiple CRC failures occurred. This PR adds a bunch
    of tests and corrects the logic for keeping track of the position when a
    batch is rejected.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../consumer/internals/ShareCompletedFetch.java    |  17 +-
 .../internals/ShareCompletedFetchTest.java         | 362 +++++++++++++++++++--
 2 files changed, 351 insertions(+), 28 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 024e5bece00..7fc39c5d5a1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -280,27 +280,32 @@ public class ShareCompletedFetch {
     }
 
     private <K, V> Set<Long> rejectRecordBatch(final ShareInFlightBatch<K, V> 
inFlightBatch,
-                                          final RecordBatch currentBatch) {
+                                               final RecordBatch currentBatch) 
{
         // Rewind the acquiredRecordIterator to the start, so we are in a 
known state
         acquiredRecordIterator = acquiredRecordList.listIterator();
 
-        OffsetAndDeliveryCount nextAcquired = nextAcquiredRecord();
+        OffsetAndDeliveryCount acquired = nextAcquiredRecord();
         Set<Long> offsets = new HashSet<>();
         for (long offset = currentBatch.baseOffset(); offset <= 
currentBatch.lastOffset(); offset++) {
-            if (nextAcquired == null) {
+            while (acquired != null && acquired.offset < offset) {
+                acquired = nextAcquiredRecord();
+            }
+
+            if (acquired == null) {
                 // No more acquired records, so we are done
                 break;
-            } else if (offset == nextAcquired.offset) {
+            } else if (offset == acquired.offset) {
                 // It's acquired, so we reject it
                 inFlightBatch.addAcknowledgement(offset, 
AcknowledgeType.REJECT);
                 offsets.add(offset);
-            } else if (offset < nextAcquired.offset) {
+            } else {
                 // It's not acquired, so we skip it
                 continue;
             }
 
-            nextAcquired = nextAcquiredRecord();
+            acquired = nextAcquiredRecord();
         }
+        this.nextAcquired = acquired;
         return offsets;
     }
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index 824f2ecf030..8e97f0c5eee 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.CorruptRecordException;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -49,7 +50,6 @@ import org.junit.jupiter.api.Test;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Optional;
@@ -225,8 +225,8 @@ public class ShareCompletedFetchTest {
     }
 
     @Test
-    public void testCorruptedMessage() {
-        // Create one good record and then two "corrupted" records and then 
another good record.
+    public void testRecordDeserializationException() {
+        // Create one good record and then two records which do not 
deserialize and then another good record.
         try (final MemoryRecordsBuilder builder = 
MemoryRecords.builder(ByteBuffer.allocate(1024),
                 Compression.NONE,
                 TimestampType.CREATE_TIME,
@@ -346,8 +346,8 @@ public class ShareCompletedFetchTest {
 
         // Acquire all records including the control record (offset 10 is the 
commit marker)
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(rawRecords)
-                .setAcquiredRecords(acquiredRecords(0L, numRecords + 1));
+            .setRecords(rawRecords)
+            .setAcquiredRecords(acquiredRecords(0L, numRecords + 1));
 
         ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
         try (final Deserializers<String, String> deserializers = 
newStringDeserializers()) {
@@ -374,8 +374,8 @@ public class ShareCompletedFetchTest {
         acquiredRecords.add(acquiredRecords(10L, 5).get(0));
 
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(newRecords(startingOffset,  10))
-                .setAcquiredRecords(acquiredRecords); // Acquire only records 
0-4 and 10-14
+            .setRecords(newRecords(startingOffset,  10))
+            .setAcquiredRecords(acquiredRecords); // Acquire only records 0-4 
and 10-14
 
         Deserializers<String, String> deserializers = newStringDeserializers();
 
@@ -407,8 +407,8 @@ public class ShareCompletedFetchTest {
 
         // Acquire only non-existent records 15-19 (all should be gaps)
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(newRecords(startingOffset, numRecords))  // 
Records 0-9
-                .setAcquiredRecords(acquiredRecords(15L, 5));       // Acquire 
15-19 (don't exist)
+            .setRecords(newRecords(startingOffset, numRecords))  // Records 0-9
+            .setAcquiredRecords(acquiredRecords(15L, 5));       // Acquire 
15-19 (don't exist)
 
         Deserializers<String, String> deserializers = newStringDeserializers();
 
@@ -464,8 +464,8 @@ public class ShareCompletedFetchTest {
 
         // Acquire all offsets 0-6 (includes both control records and data 
records)
         ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
-                .setRecords(records)
-                .setAcquiredRecords(acquiredRecords(0L, 7));
+            .setRecords(records)
+            .setAcquiredRecords(acquiredRecords(0L, 7));
 
         ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
         try (final Deserializers<String, String> deserializers = 
newStringDeserializers()) {
@@ -575,6 +575,222 @@ public class ShareCompletedFetchTest {
         }
     }
 
+    @Test
+    public void testCrcFailureOnFirstBatchThenSubsequentFetch() {
+        // Create 3 batches: corrupted batch (offsets 0-9), good batch 
(10-19), good batch (20-29)
+        Records records = newRecordsWithCorruptedFirstBatch();
+
+        // Acquire all 30 records (0-29)
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(acquiredRecords(0L, 10).get(0));
+        acquiredRecords.add(acquiredRecords(10L, 10).get(0));
+        acquiredRecords.add(acquiredRecords(20L, 10).get(0));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+            .setRecords(records)
+            .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // First fetch should fail with CRC error and reject the entire first 
batch
+        ShareInFlightBatch<String, String> batch1 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertNotNull(batch1.getException(), "Should have exception for 
corrupted batch");
+        assertEquals(CorruptRecordException.class, 
batch1.getException().cause().getClass());
+
+        // Verify all 10 records from first batch are rejected
+        Acknowledgements acks1 = batch1.getAcknowledgements();
+        assertEquals(10, acks1.size(), "All records in corrupted batch should 
be rejected");
+        for (long offset = 0; offset < 10; offset++) {
+            assertEquals(AcknowledgeType.REJECT, acks1.get(offset),
+                "Record at offset " + offset + " should be REJECT");
+        }
+
+        // No records should be returned
+        assertEquals(0, batch1.getInFlightRecords().size());
+
+        // Should successfully fetch records from the second batch (offsets 
10-19)
+        ShareInFlightBatch<String, String> batch2 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertNotNull(batch2, "Should return a batch");
+        List<ConsumerRecord<String, String>> records2 = 
batch2.getInFlightRecords();
+        assertEquals(10, records2.size(), "Should get 10 records from second 
batch");
+        assertEquals(10L, records2.get(0).offset(), "First record should be 
offset 10");
+        assertEquals(19L, records2.get(9).offset(), "Last record should be 
offset 19");
+
+        // Third fetch should process the third batch correctly
+        ShareInFlightBatch<String, String> batch3 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        List<ConsumerRecord<String, String>> records3 = 
batch3.getInFlightRecords();
+        assertEquals(10, records3.size(), "Should get 10 records from third 
batch");
+        assertEquals(20L, records3.get(0).offset(), "First record should be 
offset 20");
+        assertEquals(29L, records3.get(9).offset(), "Last record should be 
offset 29");
+    }
+
+    @Test
+    public void testCrcFailureWithGapsInBatch() {
+        // Create corrupted batch with offsets 0-9
+        Records records = newSingleCorruptedBatch(0L, 10);
+
+        // Only acquire odd offsets: 1, 3, 5, 7, 9
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        for (long offset = 1; offset < 10; offset += 2) {
+            acquiredRecords.add(acquiredRecords(offset, 1).get(0));
+        }
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+            .setRecords(records)
+            .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // Fetch should fail with CRC error
+        ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertNotNull(batch.getException());
+        assertEquals(CorruptRecordException.class, 
batch.getException().cause().getClass());
+
+        // Only the acquired records (odd offsets) should be rejected
+        Acknowledgements acks = batch.getAcknowledgements();
+        assertEquals(5, acks.size(), "Only 5 acquired records should be 
rejected");
+        assertEquals(AcknowledgeType.REJECT, acks.get(1L));
+        assertEquals(AcknowledgeType.REJECT, acks.get(3L));
+        assertEquals(AcknowledgeType.REJECT, acks.get(5L));
+        assertEquals(AcknowledgeType.REJECT, acks.get(7L));
+        assertEquals(AcknowledgeType.REJECT, acks.get(9L));
+    }
+
+    @Test
+    public void testCrcFailureWithUnalignedAcquiredRecords() {
+        // Create corrupted batch with offsets 0-19
+        Records records = newSingleCorruptedBatch(0L, 20);
+
+        // Only acquire records 10-19 (second half of the corrupted batch)
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(acquiredRecords(10L, 10).get(0));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+            .setRecords(records)
+            .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // Fetch should fail with CRC error
+        ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 20, true);
+        assertNotNull(batch.getException());
+
+        // Only the acquired records (10-19) should be rejected
+        Acknowledgements acks = batch.getAcknowledgements();
+        assertEquals(10, acks.size());
+        for (long offset = 10; offset < 20; offset++) {
+            assertEquals(AcknowledgeType.REJECT, acks.get(offset));
+        }
+    }
+
+    @Test
+    public void testCorruptedRecordsWithCrcCheckDisabled() {
+        // Create corrupted batch
+        Records records = newSingleCorruptedBatch(0L, 10);
+
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(acquiredRecords(0L, 10).get(0));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+            .setRecords(records)
+            .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // Fetch with CRC check disabled - should process records despite 
corruption
+        ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 10, false);
+
+        // Should get records (they're corrupted, but we're not checking)
+        List<ConsumerRecord<String, String>> fetchedRecords = 
batch.getInFlightRecords();
+        assertEquals(10, fetchedRecords.size(), "Should get all records when 
CRC check is disabled");
+    }
+
+    @Test
+    public void testCrcFailureAfterPartialFetch() {
+        // Create 2 batches: good batch (offsets 0-4), corrupted batch (5-14)
+        // Using smaller first batch so fetching with maxRecords = 10 will 
need to load the second batch
+        Records records = newRecordsWithCorruptedSecondBatchSmallFirst();
+
+        // Acquire all 15 records
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(acquiredRecords(0L, 15).get(0));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+            .setRecords(records)
+            .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // First fetch with maxRecords=10 should return 5 records from first 
batch,
+        // then attempt to load second batch which is corrupted
+        ShareInFlightBatch<String, String> batch1 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        List<ConsumerRecord<String, String>> records1 = 
batch1.getInFlightRecords();
+        assertEquals(5, records1.size(), "Should get 5 records from first 
batch before hitting CRC error");
+        assertEquals(0L, records1.get(0).offset());
+        assertEquals(4L, records1.get(4).offset());
+        assertTrue(batch1.hasCachedException(), "Should indicate there's a 
cached exception");
+
+        // Second fetch should return the cached CRC exception for the second 
batch
+        ShareInFlightBatch<String, String> batch2 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertNotNull(batch2.getException(), "Should have cached exception");
+        assertEquals(CorruptRecordException.class, 
batch2.getException().cause().getClass());
+
+        // Verify all 10 records from second batch are rejected
+        Acknowledgements acks2 = batch2.getAcknowledgements();
+        assertEquals(10, acks2.size());
+        for (long offset = 5; offset < 15; offset++) {
+            assertEquals(AcknowledgeType.REJECT, acks2.get(offset));
+        }
+
+        // No more records
+        ShareInFlightBatch<String, String> batch3 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertEquals(0, batch3.getInFlightRecords().size());
+    }
+
+    @Test
+    public void testMultipleConsecutiveCrcFailures() {
+        // Create 4 batches: corrupted, corrupted, good, good
+        Records records = newRecordsWithMultipleCorruptedBatches();
+
+        // Acquire all records (0-39)
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(acquiredRecords(0L, 40).get(0));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+            .setRecords(records)
+            .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // First corrupted batch (0-9)
+        ShareInFlightBatch<String, String> batch1 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertNotNull(batch1.getException(), "First batch should have CRC 
exception");
+        Acknowledgements acks1 = batch1.getAcknowledgements();
+        assertEquals(10, acks1.size(), "First corrupted batch should reject 10 
records");
+
+        // Second corrupted batch (10-19)
+        ShareInFlightBatch<String, String> batch2 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertNotNull(batch2.getException(), "Second batch should have CRC 
exception");
+        Acknowledgements acks2 = batch2.getAcknowledgements();
+        assertEquals(10, acks2.size(), "Second corrupted batch should reject 
10 records");
+
+        // Third batch should be good (20-29)
+        ShareInFlightBatch<String, String> batch3 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertEquals(10, batch3.getInFlightRecords().size(), "Third batch 
should return 10 good records");
+        assertEquals(20L, batch3.getInFlightRecords().get(0).offset());
+
+        // Fourth batch should be good (30-39)
+        ShareInFlightBatch<String, String> batch4 = 
completedFetch.fetchRecords(deserializers, 10, true);
+        assertEquals(10, batch4.getInFlightRecords().size(), "Fourth batch 
should return 10 good records");
+        assertEquals(30L, batch4.getInFlightRecords().get(0).offset());
+    }
+
     private ShareCompletedFetch 
newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) {
         LogContext logContext = new LogContext();
         ShareFetchMetricsRegistry shareFetchMetricsRegistry = new 
ShareFetchMetricsRegistry();
@@ -619,16 +835,16 @@ public class ShareCompletedFetchTest {
 
         for (long b = 0; b < batchCount; b++) {
             try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
-                RecordBatch.CURRENT_MAGIC_VALUE,
-                Compression.NONE,
-                TimestampType.CREATE_TIME,
-                baseOffset + b * numRecordsPerBatch,
-                time.milliseconds(),
-                PRODUCER_ID,
-                PRODUCER_EPOCH,
-                0,
-                true,
-                RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+                    RecordBatch.CURRENT_MAGIC_VALUE,
+                    Compression.NONE,
+                    TimestampType.CREATE_TIME,
+                    baseOffset + b * numRecordsPerBatch,
+                    time.milliseconds(),
+                    PRODUCER_ID,
+                    PRODUCER_EPOCH,
+                    0,
+                    true,
+                    RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
                 for (int i = 0; i < numRecordsPerBatch; i++)
                     builder.append(new SimpleRecord(time.milliseconds(), 
"key".getBytes(), "value".getBytes()));
 
@@ -646,7 +862,7 @@ public class ShareCompletedFetchTest {
             .setFirstOffset(firstOffset)
             .setLastOffset(firstOffset + count - 1)
             .setDeliveryCount((short) 1);
-        return Collections.singletonList(acquiredRecords);
+        return List.of(acquiredRecords);
     }
 
     private Records newTransactionalRecords(int numRecords) {
@@ -687,4 +903,106 @@ public class ShareCompletedFetchTest {
             PRODUCER_EPOCH,
             new EndTransactionMarker(ControlRecordType.COMMIT, 0));
     }
+
+    private void createBatch(ByteBuffer buffer, long baseOffset, int 
numRecords, Time time) {
+        try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+                RecordBatch.CURRENT_MAGIC_VALUE,
+                Compression.NONE,
+                TimestampType.CREATE_TIME,
+                baseOffset,
+                time.milliseconds(),
+                PRODUCER_ID,
+                PRODUCER_EPOCH,
+                0,
+                false,
+                RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+            for (int i = 0; i < numRecords; i++) {
+                builder.append(new SimpleRecord(time.milliseconds(), 
"key".getBytes(), ("value-" + (baseOffset + i)).getBytes()));
+            }
+            builder.build();
+        }
+    }
+
+    private void corruptBatchCrc(ByteBuffer buffer, int batchStartPosition) {
+        int currentPosition = buffer.position();
+
+        // CRC is at offset 17 in the record batch (after base offset, batch 
length, partition leader epoch, and magic)
+        // For v2 records: [baseOffset(8) | batchLength(4) | 
partitionLeaderEpoch(4) | magic(1) | crc(4) | ...]
+        int crcPosition = batchStartPosition + 17;
+
+        buffer.putInt(crcPosition, 0xDEADBEEF);
+
+        buffer.position(currentPosition);
+    }
+
+    private Records newRecordsWithCorruptedFirstBatch() {
+        Time time = new MockTime();
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+
+        // First batch (corrupted) - offsets 0-9
+        int pos1 = buffer.position();
+        createBatch(buffer, 0L, 10, time);
+        corruptBatchCrc(buffer, pos1);
+
+        // Second batch (good) - offsets 10-19
+        createBatch(buffer, 10L, 10, time);
+
+        // Third batch (good) - offsets 20-29
+        createBatch(buffer, 20L, 10, time);
+
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
+    }
+
+    private Records newRecordsWithCorruptedSecondBatchSmallFirst() {
+        Time time = new MockTime();
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+
+        // First batch (good) - offsets 0-4
+        createBatch(buffer, 0L, 5, time);
+
+        // Second batch (corrupted) - offsets 5-14
+        int pos = buffer.position();
+        createBatch(buffer, 5L, 10, time);
+        corruptBatchCrc(buffer, pos);
+
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
+    }
+
+    private Records newSingleCorruptedBatch(long baseOffset, int numRecords) {
+        Time time = new MockTime();
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+        int pos = buffer.position();
+        createBatch(buffer, baseOffset, numRecords, time);
+        corruptBatchCrc(buffer, pos);
+
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
+    }
+
+    private Records newRecordsWithMultipleCorruptedBatches() {
+        Time time = new MockTime();
+        ByteBuffer buffer = ByteBuffer.allocate(8192);
+
+        // First batch (corrupted) - offsets 0-9
+        int pos1 = buffer.position();
+        createBatch(buffer, 0L, 10, time);
+        corruptBatchCrc(buffer, pos1);
+
+        // Second batch (corrupted) - offsets 10-19
+        int pos2 = buffer.position();
+        createBatch(buffer, 10L, 10, time);
+        corruptBatchCrc(buffer, pos2);
+
+        // Third batch (good) - offsets 20-29
+        createBatch(buffer, 20L, 10, time);
+
+        // Fourth batch (good) - offsets 30-39
+        createBatch(buffer, 30L, 10, time);
+
+        buffer.flip();
+        return MemoryRecords.readableRecords(buffer);
+    }
 }

Reply via email to