chia7712 commented on code in PR #20752:
URL: https://github.com/apache/kafka/pull/20752#discussion_r2453054436


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetch.java:
##########
@@ -100,9 +100,16 @@ public class ShareCompletedFetch {
 
     private List<OffsetAndDeliveryCount> 
buildAcquiredRecordList(List<ShareFetchResponseData.AcquiredRecords> 
partitionAcquiredRecords) {
         List<OffsetAndDeliveryCount> acquiredRecordList = new LinkedList<>();
+        // Set to find duplicates in case of overlapping acquired records
+        Set<Long> offsets = new HashSet<>();
         partitionAcquiredRecords.forEach(acquiredRecords -> {
             for (long offset = acquiredRecords.firstOffset(); offset <= 
acquiredRecords.lastOffset(); offset++) {
-                acquiredRecordList.add(new OffsetAndDeliveryCount(offset, 
acquiredRecords.deliveryCount()));
+                if (!offsets.add(offset)) {
+                    log.error("Duplicate acquired record offset {} found in 
share fetch response for partition {}. " +
+                            "This indicates a broker processing issue.", 
offset, partition.topicPartition());

Review Comment:
   Just curious, are there any known issues that lead to duplicate offsets?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareCompletedFetchTest.java:
##########
@@ -356,6 +357,63 @@ record = records.get(1);
         assertEquals(0, records.size());
     }
 
+    @Test
+    public void 
testOverlappingAcquiredRecordsLogsErrorAndRetainsFirstOccurrence() {
+        int startingOffset = 0;
+        int numRecords = 20;        // Records for 0-19
+
+        // Create overlapping acquired records: [0-9] and [5-14]
+        // Offsets 5-9 will be duplicates
+        List<ShareFetchResponseData.AcquiredRecords> acquiredRecords = new 
ArrayList<>();
+        acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(0L)
+                .setLastOffset(9L)
+                .setDeliveryCount((short) 1));
+        acquiredRecords.add(new ShareFetchResponseData.AcquiredRecords()
+                .setFirstOffset(5L)
+                .setLastOffset(14L)
+                .setDeliveryCount((short) 2));
+
+        ShareFetchResponseData.PartitionData partitionData = new 
ShareFetchResponseData.PartitionData()
+                .setRecords(newRecords(startingOffset, numRecords))
+                .setAcquiredRecords(acquiredRecords);
+
+        ShareCompletedFetch completedFetch = 
newShareCompletedFetch(partitionData);
+
+        Deserializers<String, String> deserializers = newStringDeserializers();
+
+        // Fetch records and verify that only 15 unique records are returned 
(0-14)
+        ShareInFlightBatch<String, String> batch = 
completedFetch.fetchRecords(deserializers, 20, true);
+        List<ConsumerRecord<String, String>> records = 
batch.getInFlightRecords();
+        
+        // Should get 15 unique records: 0-9 from first range (with 
deliveryCount=1)
+        // and 10-14 from second range (with deliveryCount=2)
+        assertEquals(15, records.size());
+        
+        // Verify first occurrence (offset 5 should have deliveryCount=1 from 
first range)
+        ConsumerRecord<String, String> record5 = records.stream()
+                .filter(r -> r.offset() == 5L)
+                .findFirst()
+                .orElse(null);
+        assertNotNull(record5);
+        assertEquals(Optional.of((short) 1), record5.deliveryCount());
+        
+        // Verify offset 10 has deliveryCount=2 from second range
+        ConsumerRecord<String, String> record10 = records.stream()
+                .filter(r -> r.offset() == 10L)
+                .findFirst()
+                .orElse(null);
+        assertNotNull(record10);
+        assertEquals(Optional.of((short) 2), record10.deliveryCount());
+        
+        // Verify all offsets are unique
+        Set<Long> offsetSet = new HashSet<>();
+        for (ConsumerRecord<String, String> record : records) {

Review Comment:
   I'm not sure if this covers the new behavior, since `inFlightRecords` 
already handles offset deduplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to