This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6aa702fb248 MINOR: Improve CRC failure handling share groups (#21569)
6aa702fb248 is described below
commit 6aa702fb248b40834d6c95f7a192d8f3c3130f5e
Author: Andrew Schofield <[email protected]>
AuthorDate: Mon Mar 2 22:52:16 2026 +0000
MINOR: Improve CRC failure handling share groups (#21569)
The handling of CRC failures in CompletedShareFetch was incorrect,
particularly when multiple CRC failures occurred. This PR adds a bunch
of tests and corrects the logic for keeping track of the position when a
batch is rejected.
Reviewers: Apoorv Mittal <[email protected]>
---
.../consumer/internals/ShareCompletedFetch.java | 17 +-
.../internals/ShareCompletedFetchTest.java | 362 +++++++++++++++++++--
2 files changed, 351 insertions(+), 28 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
index 024e5bece00..7fc39c5d5a1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java
@@ -280,27 +280,32 @@ public class ShareCompletedFetch {
}
private <K, V> Set<Long> rejectRecordBatch(final ShareInFlightBatch<K, V>
inFlightBatch,
- final RecordBatch currentBatch) {
+ final RecordBatch currentBatch)
{
// Rewind the acquiredRecordIterator to the start, so we are in a
known state
acquiredRecordIterator = acquiredRecordList.listIterator();
- OffsetAndDeliveryCount nextAcquired = nextAcquiredRecord();
+ OffsetAndDeliveryCount acquired = nextAcquiredRecord();
Set<Long> offsets = new HashSet<>();
for (long offset = currentBatch.baseOffset(); offset <=
currentBatch.lastOffset(); offset++) {
- if (nextAcquired == null) {
+ while (acquired != null && acquired.offset < offset) {
+ acquired = nextAcquiredRecord();
+ }
+
+ if (acquired == null) {
// No more acquired records, so we are done
break;
- } else if (offset == nextAcquired.offset) {
+ } else if (offset == acquired.offset) {
// It's acquired, so we reject it
inFlightBatch.addAcknowledgement(offset,
AcknowledgeType.REJECT);
offsets.add(offset);
- } else if (offset < nextAcquired.offset) {
+ } else {
// It's not acquired, so we skip it
continue;
}
- nextAcquired = nextAcquiredRecord();
+ acquired = nextAcquiredRecord();
}
+ this.nextAcquired = acquired;
return offsets;
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
index 824f2ecf030..8e97f0c5eee 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
@@ -49,7 +50,6 @@ import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
@@ -225,8 +225,8 @@ public class ShareCompletedFetchTest {
}
@Test
- public void testCorruptedMessage() {
- // Create one good record and then two "corrupted" records and then
another good record.
+ public void testRecordDeserializationException() {
+ // Create one good record and then two records which do not
deserialize and then another good record.
try (final MemoryRecordsBuilder builder =
MemoryRecords.builder(ByteBuffer.allocate(1024),
Compression.NONE,
TimestampType.CREATE_TIME,
@@ -346,8 +346,8 @@ public class ShareCompletedFetchTest {
// Acquire all records including the control record (offset 10 is the
commit marker)
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(rawRecords)
- .setAcquiredRecords(acquiredRecords(0L, numRecords + 1));
+ .setRecords(rawRecords)
+ .setAcquiredRecords(acquiredRecords(0L, numRecords + 1));
ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
try (final Deserializers<String, String> deserializers =
newStringDeserializers()) {
@@ -374,8 +374,8 @@ public class ShareCompletedFetchTest {
acquiredRecords.add(acquiredRecords(10L, 5).get(0));
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(newRecords(startingOffset, 10))
- .setAcquiredRecords(acquiredRecords); // Acquire only records
0-4 and 10-14
+ .setRecords(newRecords(startingOffset, 10))
+ .setAcquiredRecords(acquiredRecords); // Acquire only records 0-4
and 10-14
Deserializers<String, String> deserializers = newStringDeserializers();
@@ -407,8 +407,8 @@ public class ShareCompletedFetchTest {
// Acquire only non-existent records 15-19 (all should be gaps)
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(newRecords(startingOffset, numRecords)) //
Records 0-9
- .setAcquiredRecords(acquiredRecords(15L, 5)); // Acquire
15-19 (don't exist)
+ .setRecords(newRecords(startingOffset, numRecords)) // Records 0-9
+ .setAcquiredRecords(acquiredRecords(15L, 5)); // Acquire
15-19 (don't exist)
Deserializers<String, String> deserializers = newStringDeserializers();
@@ -464,8 +464,8 @@ public class ShareCompletedFetchTest {
// Acquire all offsets 0-6 (includes both control records and data
records)
ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
- .setRecords(records)
- .setAcquiredRecords(acquiredRecords(0L, 7));
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords(0L, 7));
ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
try (final Deserializers<String, String> deserializers =
newStringDeserializers()) {
@@ -575,6 +575,222 @@ public class ShareCompletedFetchTest {
}
}
+ @Test
+ public void testCrcFailureOnFirstBatchThenSubsequentFetch() {
+ // Create 3 batches: corrupted batch (offsets 0-9), good batch
(10-19), good batch (20-29)
+ Records records = newRecordsWithCorruptedFirstBatch();
+
+ // Acquire all 30 records (0-29)
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ acquiredRecords.add(acquiredRecords(0L, 10).get(0));
+ acquiredRecords.add(acquiredRecords(10L, 10).get(0));
+ acquiredRecords.add(acquiredRecords(20L, 10).get(0));
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords);
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ // First fetch should fail with CRC error and reject the entire first
batch
+ ShareInFlightBatch<String, String> batch1 =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertNotNull(batch1.getException(), "Should have exception for
corrupted batch");
+ assertEquals(CorruptRecordException.class,
batch1.getException().cause().getClass());
+
+ // Verify all 10 records from first batch are rejected
+ Acknowledgements acks1 = batch1.getAcknowledgements();
+ assertEquals(10, acks1.size(), "All records in corrupted batch should
be rejected");
+ for (long offset = 0; offset < 10; offset++) {
+ assertEquals(AcknowledgeType.REJECT, acks1.get(offset),
+ "Record at offset " + offset + " should be REJECT");
+ }
+
+ // No records should be returned
+ assertEquals(0, batch1.getInFlightRecords().size());
+
+ // Should successfully fetch records from the second batch (offsets
10-19)
+ ShareInFlightBatch<String, String> batch2 =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertNotNull(batch2, "Should return a batch");
+ List<ConsumerRecord<String, String>> records2 =
batch2.getInFlightRecords();
+ assertEquals(10, records2.size(), "Should get 10 records from second
batch");
+ assertEquals(10L, records2.get(0).offset(), "First record should be
offset 10");
+ assertEquals(19L, records2.get(9).offset(), "Last record should be
offset 19");
+
+ // Third fetch should process the third batch correctly
+ ShareInFlightBatch<String, String> batch3 =
completedFetch.fetchRecords(deserializers, 10, true);
+ List<ConsumerRecord<String, String>> records3 =
batch3.getInFlightRecords();
+ assertEquals(10, records3.size(), "Should get 10 records from third
batch");
+ assertEquals(20L, records3.get(0).offset(), "First record should be
offset 20");
+ assertEquals(29L, records3.get(9).offset(), "Last record should be
offset 29");
+ }
+
+ @Test
+ public void testCrcFailureWithGapsInBatch() {
+ // Create corrupted batch with offsets 0-9
+ Records records = newSingleCorruptedBatch(0L, 10);
+
+ // Only acquire odd offsets: 1, 3, 5, 7, 9
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ for (long offset = 1; offset < 10; offset += 2) {
+ acquiredRecords.add(acquiredRecords(offset, 1).get(0));
+ }
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords);
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ // Fetch should fail with CRC error
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertNotNull(batch.getException());
+ assertEquals(CorruptRecordException.class,
batch.getException().cause().getClass());
+
+ // Only the acquired records (odd offsets) should be rejected
+ Acknowledgements acks = batch.getAcknowledgements();
+ assertEquals(5, acks.size(), "Only 5 acquired records should be
rejected");
+ assertEquals(AcknowledgeType.REJECT, acks.get(1L));
+ assertEquals(AcknowledgeType.REJECT, acks.get(3L));
+ assertEquals(AcknowledgeType.REJECT, acks.get(5L));
+ assertEquals(AcknowledgeType.REJECT, acks.get(7L));
+ assertEquals(AcknowledgeType.REJECT, acks.get(9L));
+ }
+
+ @Test
+ public void testCrcFailureWithUnalignedAcquiredRecords() {
+ // Create corrupted batch with offsets 0-19
+ Records records = newSingleCorruptedBatch(0L, 20);
+
+ // Only acquire records 10-19 (second half of the corrupted batch)
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ acquiredRecords.add(acquiredRecords(10L, 10).get(0));
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords);
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ // Fetch should fail with CRC error
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 20, true);
+ assertNotNull(batch.getException());
+
+ // Only the acquired records (10-19) should be rejected
+ Acknowledgements acks = batch.getAcknowledgements();
+ assertEquals(10, acks.size());
+ for (long offset = 10; offset < 20; offset++) {
+ assertEquals(AcknowledgeType.REJECT, acks.get(offset));
+ }
+ }
+
+ @Test
+ public void testCorruptedRecordsWithCrcCheckDisabled() {
+ // Create corrupted batch
+ Records records = newSingleCorruptedBatch(0L, 10);
+
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ acquiredRecords.add(acquiredRecords(0L, 10).get(0));
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords);
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ // Fetch with CRC check disabled - should process records despite
corruption
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 10, false);
+
+ // Should get records (they're corrupted, but we're not checking)
+ List<ConsumerRecord<String, String>> fetchedRecords =
batch.getInFlightRecords();
+ assertEquals(10, fetchedRecords.size(), "Should get all records when
CRC check is disabled");
+ }
+
+ @Test
+ public void testCrcFailureAfterPartialFetch() {
+ // Create 2 batches: good batch (offsets 0-4), corrupted batch (5-14)
+ // Using smaller first batch so fetching with maxRecords = 10 will
need to load the second batch
+ Records records = newRecordsWithCorruptedSecondBatchSmallFirst();
+
+ // Acquire all 15 records
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ acquiredRecords.add(acquiredRecords(0L, 15).get(0));
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords);
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ // First fetch with maxRecords=10 should return 5 records from first
batch,
+ // then attempt to load second batch which is corrupted
+ ShareInFlightBatch<String, String> batch1 =
completedFetch.fetchRecords(deserializers, 10, true);
+ List<ConsumerRecord<String, String>> records1 =
batch1.getInFlightRecords();
+ assertEquals(5, records1.size(), "Should get 5 records from first
batch before hitting CRC error");
+ assertEquals(0L, records1.get(0).offset());
+ assertEquals(4L, records1.get(4).offset());
+ assertTrue(batch1.hasCachedException(), "Should indicate there's a
cached exception");
+
+ // Second fetch should return the cached CRC exception for the second
batch
+ ShareInFlightBatch<String, String> batch2 =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertNotNull(batch2.getException(), "Should have cached exception");
+ assertEquals(CorruptRecordException.class,
batch2.getException().cause().getClass());
+
+ // Verify all 10 records from second batch are rejected
+ Acknowledgements acks2 = batch2.getAcknowledgements();
+ assertEquals(10, acks2.size());
+ for (long offset = 5; offset < 15; offset++) {
+ assertEquals(AcknowledgeType.REJECT, acks2.get(offset));
+ }
+
+ // No more records
+ ShareInFlightBatch<String, String> batch3 =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertEquals(0, batch3.getInFlightRecords().size());
+ }
+
+ @Test
+ public void testMultipleConsecutiveCrcFailures() {
+ // Create 4 batches: corrupted, corrupted, good, good
+ Records records = newRecordsWithMultipleCorruptedBatches();
+
+ // Acquire all records (0-39)
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ acquiredRecords.add(acquiredRecords(0L, 40).get(0));
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(records)
+ .setAcquiredRecords(acquiredRecords);
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ // First corrupted batch (0-9)
+ ShareInFlightBatch<String, String> batch1 =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertNotNull(batch1.getException(), "First batch should have CRC
exception");
+ Acknowledgements acks1 = batch1.getAcknowledgements();
+ assertEquals(10, acks1.size(), "First corrupted batch should reject 10
records");
+
+ // Second corrupted batch (10-19)
+ ShareInFlightBatch<String, String> batch2 =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertNotNull(batch2.getException(), "Second batch should have CRC
exception");
+ Acknowledgements acks2 = batch2.getAcknowledgements();
+ assertEquals(10, acks2.size(), "Second corrupted batch should reject
10 records");
+
+ // Third batch should be good (20-29)
+ ShareInFlightBatch<String, String> batch3 =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertEquals(10, batch3.getInFlightRecords().size(), "Third batch
should return 10 good records");
+ assertEquals(20L, batch3.getInFlightRecords().get(0).offset());
+
+ // Fourth batch should be good (30-39)
+ ShareInFlightBatch<String, String> batch4 =
completedFetch.fetchRecords(deserializers, 10, true);
+ assertEquals(10, batch4.getInFlightRecords().size(), "Fourth batch
should return 10 good records");
+ assertEquals(30L, batch4.getInFlightRecords().get(0).offset());
+ }
+
private ShareCompletedFetch
newShareCompletedFetch(ShareFetchResponseData.PartitionData partitionData) {
LogContext logContext = new LogContext();
ShareFetchMetricsRegistry shareFetchMetricsRegistry = new
ShareFetchMetricsRegistry();
@@ -619,16 +835,16 @@ public class ShareCompletedFetchTest {
for (long b = 0; b < batchCount; b++) {
try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
- RecordBatch.CURRENT_MAGIC_VALUE,
- Compression.NONE,
- TimestampType.CREATE_TIME,
- baseOffset + b * numRecordsPerBatch,
- time.milliseconds(),
- PRODUCER_ID,
- PRODUCER_EPOCH,
- 0,
- true,
- RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ Compression.NONE,
+ TimestampType.CREATE_TIME,
+ baseOffset + b * numRecordsPerBatch,
+ time.milliseconds(),
+ PRODUCER_ID,
+ PRODUCER_EPOCH,
+ 0,
+ true,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
for (int i = 0; i < numRecordsPerBatch; i++)
builder.append(new SimpleRecord(time.milliseconds(),
"key".getBytes(), "value".getBytes()));
@@ -646,7 +862,7 @@ public class ShareCompletedFetchTest {
.setFirstOffset(firstOffset)
.setLastOffset(firstOffset + count - 1)
.setDeliveryCount((short) 1);
- return Collections.singletonList(acquiredRecords);
+ return List.of(acquiredRecords);
}
private Records newTransactionalRecords(int numRecords) {
@@ -687,4 +903,106 @@ public class ShareCompletedFetchTest {
PRODUCER_EPOCH,
new EndTransactionMarker(ControlRecordType.COMMIT, 0));
}
+
+ private void createBatch(ByteBuffer buffer, long baseOffset, int
numRecords, Time time) {
+ try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer,
+ RecordBatch.CURRENT_MAGIC_VALUE,
+ Compression.NONE,
+ TimestampType.CREATE_TIME,
+ baseOffset,
+ time.milliseconds(),
+ PRODUCER_ID,
+ PRODUCER_EPOCH,
+ 0,
+ false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH)) {
+ for (int i = 0; i < numRecords; i++) {
+ builder.append(new SimpleRecord(time.milliseconds(),
"key".getBytes(), ("value-" + (baseOffset + i)).getBytes()));
+ }
+ builder.build();
+ }
+ }
+
+ private void corruptBatchCrc(ByteBuffer buffer, int batchStartPosition) {
+ int currentPosition = buffer.position();
+
+ // CRC is at offset 17 in the record batch (after base offset, batch
length, partition leader epoch, and magic)
+ // For v2 records: [baseOffset(8) | batchLength(4) |
partitionLeaderEpoch(4) | magic(1) | crc(4) | ...]
+ int crcPosition = batchStartPosition + 17;
+
+ buffer.putInt(crcPosition, 0xDEADBEEF);
+
+ buffer.position(currentPosition);
+ }
+
+ private Records newRecordsWithCorruptedFirstBatch() {
+ Time time = new MockTime();
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+
+ // First batch (corrupted) - offsets 0-9
+ int pos1 = buffer.position();
+ createBatch(buffer, 0L, 10, time);
+ corruptBatchCrc(buffer, pos1);
+
+ // Second batch (good) - offsets 10-19
+ createBatch(buffer, 10L, 10, time);
+
+ // Third batch (good) - offsets 20-29
+ createBatch(buffer, 20L, 10, time);
+
+ buffer.flip();
+ return MemoryRecords.readableRecords(buffer);
+ }
+
+ private Records newRecordsWithCorruptedSecondBatchSmallFirst() {
+ Time time = new MockTime();
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+
+ // First batch (good) - offsets 0-4
+ createBatch(buffer, 0L, 5, time);
+
+ // Second batch (corrupted) - offsets 5-14
+ int pos = buffer.position();
+ createBatch(buffer, 5L, 10, time);
+ corruptBatchCrc(buffer, pos);
+
+ buffer.flip();
+ return MemoryRecords.readableRecords(buffer);
+ }
+
+ private Records newSingleCorruptedBatch(long baseOffset, int numRecords) {
+ Time time = new MockTime();
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+ int pos = buffer.position();
+ createBatch(buffer, baseOffset, numRecords, time);
+ corruptBatchCrc(buffer, pos);
+
+ buffer.flip();
+ return MemoryRecords.readableRecords(buffer);
+ }
+
+ private Records newRecordsWithMultipleCorruptedBatches() {
+ Time time = new MockTime();
+ ByteBuffer buffer = ByteBuffer.allocate(8192);
+
+ // First batch (corrupted) - offsets 0-9
+ int pos1 = buffer.position();
+ createBatch(buffer, 0L, 10, time);
+ corruptBatchCrc(buffer, pos1);
+
+ // Second batch (corrupted) - offsets 10-19
+ int pos2 = buffer.position();
+ createBatch(buffer, 10L, 10, time);
+ corruptBatchCrc(buffer, pos2);
+
+ // Third batch (good) - offsets 20-29
+ createBatch(buffer, 20L, 10, time);
+
+ // Fourth batch (good) - offsets 30-39
+ createBatch(buffer, 30L, 10, time);
+
+ buffer.flip();
+ return MemoryRecords.readableRecords(buffer);
+ }
}