ShivsundarR commented on code in PR #20752:
URL: https://github.com/apache/kafka/pull/20752#discussion_r2454044102
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java:
##########
@@ -356,6 +357,63 @@ record = records.get(1);
assertEquals(0, records.size());
}
+ @Test
+ public void
testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() {
+ int startingOffset = 0;
+ int numRecords = 20; // Records for 0-19
+
+ // Create overlapping acquired records: [0-9] and [5-14]
+ // Offsets 5-9 will be duplicates
+ List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new
ArrayList<>();
+ acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+ .setFirstOffset(0L)
+ .setLastOffset(9L)
+ .setDeliveryCount((short) 1));
+ acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+ .setFirstOffset(5L)
+ .setLastOffset(14L)
+ .setDeliveryCount((short) 2));
+
+ ShareFetchResponseData.PartitionData partitionData = new
ShareFetchResponseData.PartitionData()
+ .setRecords(newRecords(startingOffset, numRecords))
+ .setAcquiredRecords(acquiredRecords);
+
+ ShareCompletedFetch completedFetch =
newShareCompletedFetch(partitionData);
+
+ Deserializers<String, String> deserializers = newStringDeserializers();
+
+ // Fetch records and verify that only 15 unique records are returned
(0-14)
+ ShareInFlightBatch<String, String> batch =
completedFetch.fetchRecords(deserializers, 20, true);
+ List<ConsumerRecord<String, String>> records =
batch.getInFlightRecords();
+
+ // Should get 15 unique records: 0-9 from first range (with
deliveryCount=1)
+ // and 10-14 from second range (with deliveryCount=2)
+ assertEquals(15, records.size());
+
+ // Verify first occurrence (offset 5 should have deliveryCount=1 from
first range)
+ ConsumerRecord<String, String> record5 = records.stream()
+ .filter(r -> r.offset() == 5L)
+ .findFirst()
+ .orElse(null);
+ assertNotNull(record5);
+ assertEquals(Optional.of((short) 1), record5.deliveryCount());
+
+ // Verify offset 10 has deliveryCount=2 from second range
+ ConsumerRecord<String, String> record10 = records.stream()
+ .filter(r -> r.offset() == 10L)
+ .findFirst()
+ .orElse(null);
+ assertNotNull(record10);
+ assertEquals(Optional.of((short) 2), record10.deliveryCount());
+
+ // Verify all offsets are unique
+ Set<Long> offsetSet = new HashSet<>();
+ for (ConsumerRecord<String, String> record : records) {
Review Comment:
Yes the logic around `inFlightRecords` ensures we do not send duplicate
offsets to the application side, but the client does respond with a GAP
acknowledgement to the broker for any duplicate offset.
https://github.com/apache/kafka/blob/2c444485123a5790170856284f613384b4b033c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java#L219
Without deduplication, when the offset is encountered second
time,`lastRecord.offset > nextAcquired.offset`, (as `nextAcquired` will be an
older offset) will be true, so the client acknowledges these offsets as GAPs
which is kind of hiding the main issue.
As the broker is already in a bad state(duplication should never happen), we
thought of logging an error and ignoring any duplicates on the client.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]