ShivsundarR commented on code in PR #20752:
URL: https://github.com/apache/kafka/pull/20752#discussion_r2454044102


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java:
##########
@@ -356,6 +357,63 @@ record = records.get(1);
         assertEquals(0, records.size());
     }
 
+    @Test
+    public void 
testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() {
+        int startingOffset = 0;
+        int numRecords = 20;        // Records for 0-19
+
+        // Create overlapping acquired records: [0-9] and [5-14]
+        // Offsets 5-9 will be duplicates
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0L)
+                .setLastOffset(9L)
+                .setDeliveryCount((short) 1));
+        acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(5L)
+                .setLastOffset(14L)
+                .setDeliveryCount((short) 2));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setRecords(newRecords(startingOffset, numRecords))
+                .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // Fetch records and verify that only 15 unique records are returned 
(0-14)
+        ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 20, true);
+        List<ConsumerRecord<String, String>> records = 
batch.getInFlightRecords();
+        
+        // Should get 15 unique records: 0-9 from first range (with 
deliveryCount=1)
+        // and 10-14 from second range (with deliveryCount=2)
+        assertEquals(15, records.size());
+        
+        // Verify first occurrence (offset 5 should have deliveryCount=1 from 
first range)
+        ConsumerRecord<String, String> record5 = records.stream()
+                .filter(r -> r.offset() == 5L)
+                .findFirst()
+                .orElse(null);
+        assertNotNull(record5);
+        assertEquals(Optional.of((short) 1), record5.deliveryCount());
+        
+        // Verify offset 10 has deliveryCount=2 from second range
+        ConsumerRecord<String, String> record10 = records.stream()
+                .filter(r -> r.offset() == 10L)
+                .findFirst()
+                .orElse(null);
+        assertNotNull(record10);
+        assertEquals(Optional.of((short) 2), record10.deliveryCount());
+        
+        // Verify all offsets are unique
+        Set<Long> offsetSet = new HashSet<>();
+        for (ConsumerRecord<String, String> record : records) {

Review Comment:
   Yes the logic around `inFlightRecords` ensures we do not send duplicate 
offsets to the application side, but the client does respond with a GAP 
acknowledgement to the broker for any duplicate offset.
   
https://github.com/apache/kafka/blob/2c444485123a5790170856284f613384b4b033c2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java#L219
   
   Without deduplication, when the offset is encountered second 
time,`lastRecord.offset > nextAcquired.offset`, (as `nextAcquired` will be an 
older offset) will be true, so the client acknowledges these offsets as GAPs 
which is kind of hiding the main issue.
   As the broker is already in a bad state(duplication should never happen), we 
thought of logging an error and ignoring any duplicates on the client.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to