This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new 450101c19a5 KAFKA-19957: Accumulate offsets response in same batch 
(#21061) (#21064)
450101c19a5 is described below

commit 450101c19a530415266fb7eb8f2e39979f14735c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Dec 3 15:20:25 2025 +0000

    KAFKA-19957: Accumulate offsets response in same batch (#21061) (#21064)
    
    Merges contiguous AcquiredRecords with the same delivery count into
    single records.
    
    The PR accumulates multiple offsets in a single batch only if the
    offsets are part of same in-flight batch i.e. if cache has 0-4, and 5-9
    and the records acquired are 3,4,5,6, with same delivery count, then
    output shall be [3-4],[5-6].  This has been done to make sure that
    offsets accumulation is not past  the `batch size` in request. The
    in-flight batches originally formed on  the broker are created as per
    the `batch size` from the client hence  accumulating from single batch
    is safe.
    
    However, if in a same share group multiple clients request with
    different batch sizes then there would be discrepancy. Having said that,
    the similar problem might occur with complete batch acquisition in this
    case hence  that should be fixed separately, if needed.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../java/kafka/server/share/ShareFetchUtils.java   |  43 ++++++++
 .../java/kafka/server/share/SharePartition.java    |   9 +-
 .../kafka/server/share/ShareFetchUtilsTest.java    |  99 +++++++++++++++++
 .../kafka/server/share/SharePartitionTest.java     | 122 ++++++---------------
 4 files changed, 181 insertions(+), 92 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java 
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index ab7bee1d990..88f369b7615 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -274,4 +274,47 @@ public class ShareFetchUtils {
         }
         return defaultValue;
     }
+
+    /**
+     * Merges contiguous AcquiredRecords with the same delivery count into 
single records.
+     * <p>
+     * This method takes a list of AcquiredRecords where firstOffset and 
lastOffset typically are the
+     * same (representing single offsets), but not necessarily required, and 
merges contiguous offsets
+     * that have the same delivery count into ranges.
+     *
+     * @param result the list to accumulate merged AcquiredRecords into
+     * @param acquiredRecords the sorted list of AcquiredRecords to merge
+     */
+    static void accumulateAcquiredRecords(List<AcquiredRecords> result, 
List<AcquiredRecords> acquiredRecords) {
+        if (acquiredRecords.isEmpty()) {
+            return;
+        }
+
+        long firstOffset = acquiredRecords.get(0).firstOffset();
+        long lastOffset = acquiredRecords.get(0).lastOffset();
+        short deliveryCount = acquiredRecords.get(0).deliveryCount();
+
+        for (int i = 1; i < acquiredRecords.size(); i++) {
+            AcquiredRecords current = acquiredRecords.get(i);
+            if (current.firstOffset() == lastOffset + 1 && deliveryCount == 
current.deliveryCount()) {
+                // Extend the last offset.
+                lastOffset = current.lastOffset();
+            } else {
+                // Append the current accumulated batch and start a new batch.
+                result.add(new AcquiredRecords()
+                    .setFirstOffset(firstOffset)
+                    .setLastOffset(lastOffset)
+                    .setDeliveryCount(deliveryCount));
+                // Reset the accumulation variables to the current acquired 
records.
+                firstOffset = current.firstOffset();
+                lastOffset = current.lastOffset();
+                deliveryCount = current.deliveryCount();
+            }
+        }
+        // Add the last accumulated batch.
+        result.add(new AcquiredRecords()
+            .setFirstOffset(firstOffset)
+            .setLastOffset(lastOffset)
+            .setDeliveryCount(deliveryCount));
+    }
 }
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 92e75069a79..1043adb71fc 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1921,10 +1921,11 @@ public class SharePartition {
         InFlightBatch inFlightBatch,
         List<AcquiredRecords> result
     ) {
-        lock.writeLock().lock();
         int acquiredCount = 0;
         long maxFetchRecordsWhileThrottledRecords = -1;
         boolean hasThrottledRecord = false;
+        List<AcquiredRecords> offsetAcquiredRecords = new ArrayList<>();
+        lock.writeLock().lock();
         try {
             for (Map.Entry<Long, InFlightState> offsetState : 
inFlightBatch.offsetState().entrySet()) {
                 // For the first batch which might have offsets prior to the 
request base
@@ -1986,8 +1987,7 @@ public class SharePartition {
                 // Update acquisition lock timeout task for the offset.
                 
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
 
-                // TODO: Maybe we can club the continuous offsets here.
-                result.add(new AcquiredRecords()
+                offsetAcquiredRecords.add(new AcquiredRecords()
                     .setFirstOffset(offsetState.getKey())
                     .setLastOffset(offsetState.getKey())
                     .setDeliveryCount((short) 
offsetState.getValue().deliveryCount()));
@@ -2008,6 +2008,9 @@ public class SharePartition {
         } finally {
             lock.writeLock().unlock();
         }
+
+        // Accumulate the acquired records for the offset acquired records in 
the result.
+        ShareFetchUtils.accumulateAcquiredRecords(result, 
offsetAcquiredRecords);
         return acquiredCount;
     }
 
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java 
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index a0257ac65dc..144f505a149 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -55,6 +55,7 @@ import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -68,6 +69,7 @@ import java.util.stream.Stream;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createFileRecords;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -628,6 +630,103 @@ public class ShareFetchUtilsTest {
         assertEquals(records, slicedRecords);
     }
 
+    @Test
+    void testAccumulateAcquiredRecords() {
+        List<AcquiredRecords> input = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 2)
+        );
+
+        List<AcquiredRecords> result = new ArrayList<>();
+        ShareFetchUtils.accumulateAcquiredRecords(result, input);
+        List<AcquiredRecords> expected = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(1).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(4).setLastOffset(5).setDeliveryCount((short) 2)
+        );
+        assertArrayEquals(expected.toArray(), result.toArray());
+    }
+
+    @Test
+    void testAccumulateAcquiredRecordsAllBatches() {
+        List<AcquiredRecords> input = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(3).setLastOffset(3).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1)
+        );
+
+        List<AcquiredRecords> result = new ArrayList<>();
+        ShareFetchUtils.accumulateAcquiredRecords(result, input);
+        List<AcquiredRecords> expected = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(4).setDeliveryCount((short) 1)
+        );
+        assertArrayEquals(expected.toArray(), result.toArray());
+    }
+
+    @Test
+    void testAccumulateAcquiredRecordsWithRanges() {
+        List<AcquiredRecords> input = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(16).setLastOffset(20).setDeliveryCount((short) 
2)
+        );
+
+        List<AcquiredRecords> result = new ArrayList<>();
+        ShareFetchUtils.accumulateAcquiredRecords(result, input);
+        List<AcquiredRecords> expected = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(4).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(10).setLastOffset(20).setDeliveryCount((short) 
2)
+        );
+        assertArrayEquals(expected.toArray(), result.toArray());
+    }
+
+    @Test
+    void testAccumulateAcquiredRecordsEmptyList() {
+        List<AcquiredRecords> result = new ArrayList<>();
+        ShareFetchUtils.accumulateAcquiredRecords(result, List.of());
+        assertTrue(result.isEmpty());
+    }
+
+    @Test
+    void testAccumulateAcquiredRecordsSingleRecord() {
+        List<AcquiredRecords> result = new ArrayList<>();
+        List<AcquiredRecords> input = List.of(
+            new 
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 
4));
+        ShareFetchUtils.accumulateAcquiredRecords(result, input);
+        assertArrayEquals(input.toArray(), result.toArray());
+    }
+
+    @Test
+    void testAccumulateAcquiredRecordsNoMerging() {
+        List<AcquiredRecords> input = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1)
+        );
+
+        List<AcquiredRecords> result = new ArrayList<>();
+        ShareFetchUtils.accumulateAcquiredRecords(result, input);
+        assertArrayEquals(input.toArray(), result.toArray());
+
+        input = List.of(
+            new 
AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short) 
1),
+            new 
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short) 
2),
+            new 
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 3)
+        );
+
+        result = new ArrayList<>();
+        ShareFetchUtils.accumulateAcquiredRecords(result, input);
+        assertArrayEquals(input.toArray(), result.toArray());
+    }
+
     private static class RecordsArgumentsProvider implements ArgumentsProvider 
{
         @Override
         public Stream<? extends Arguments> provideArguments(ExtensionContext 
context) throws Exception {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index f84f64afdb1..beae9e48d8d 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -96,7 +96,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.IntStream;
 
 import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
 import static 
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
@@ -1366,7 +1365,7 @@ public class SharePartitionTest {
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 27, 2));
 
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
         assertEquals(28, sharePartition.nextFetchOffset());
@@ -1698,7 +1697,7 @@ public class SharePartitionTest {
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 27, 2));
 
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
         assertEquals(28, sharePartition.nextFetchOffset());
@@ -2387,7 +2386,7 @@ public class SharePartitionTest {
 
         // Validate 1 batch is fetched, with 5 records till end of batch, last 
available batch should
         // not be acquired
-        assertArrayEquals(expectedAcquiredRecords(15, 19, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(15, 19, 2).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(30, sharePartition.nextFetchOffset());
 
         // Release last offset of the acquired batch. Only 1 record should be 
released and later acquired.
@@ -3204,7 +3203,7 @@ public class SharePartitionTest {
         // Send the same fetch request batch again but only 2 offsets should 
come as acquired.
         acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 2);
 
-        assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(12, 13, 2).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(15, sharePartition.nextFetchOffset());
         assertEquals(0, sharePartition.deliveryCompleteCount());
     }
@@ -3288,7 +3287,7 @@ public class SharePartitionTest {
         // Send next batch from offset 12, only 3 records should be acquired.
         acquiredRecordsList = fetchAcquiredRecords(sharePartition, records1, 
3);
 
-        assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(12, 14, 2).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(15, sharePartition.nextFetchOffset());
 
         // Though record2 batch exists to acquire but send batch record3, it 
should be acquired but
@@ -3302,7 +3301,7 @@ public class SharePartitionTest {
         MemoryRecords subsetRecords = memoryRecords(17, 2);
         acquiredRecordsList = fetchAcquiredRecords(sharePartition, 
subsetRecords, 2);
 
-        assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(17, 18, 2).toArray(), 
acquiredRecordsList.toArray());
         // Next fetch offset should not move.
         assertEquals(15, sharePartition.nextFetchOffset());
 
@@ -3320,7 +3319,7 @@ public class SharePartitionTest {
         acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2, 
3);
 
         // Offset 15,16 and 19 should be acquired.
-        List<AcquiredRecords> expectedAcquiredRecords = 
expectedAcquiredRecords(15, 16, 2);
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15, 16, 2));
         expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
         // Next fetch offset should not move.
@@ -3441,11 +3440,7 @@ public class SharePartitionTest {
         // The gap from 11 to 20 will be acquired. Since the next batch is 
AVAILABLE, and we records fetched from replica manager
         // overlap with the next batch, some records from the next batch will 
also be acquired
         List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 3));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 3));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 3));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 3));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 3));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 25, 3));
         assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
 
         assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
@@ -3546,11 +3541,7 @@ public class SharePartitionTest {
         expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
         expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
         expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 80, 1));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 81, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(82, 82, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(83, 83, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(84, 84, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(85, 85, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 85, 2));
         assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
 
         assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
@@ -5773,11 +5764,8 @@ public class SharePartitionTest {
             fetchPartitionData(records),
             FETCH_ISOLATION_HWM), 15);
 
-        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(16, 16, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(17, 17, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(18, 18, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(19, 19, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(20, 20, 2));
+        // As divided in 2 batches hence merge shall not happen here for all 
batches.
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(16, 20, 2));
         expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 2));
         assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
 
@@ -10447,13 +10435,7 @@ public class SharePartitionTest {
             5);
 
         // Should acquire the subset of records in InflightBatch which are 
still available.
-        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(5, 5, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(6, 6, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(7, 7, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(8, 8, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(9, 9, 1));
-
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(5, 9, 1).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(10, sharePartition.nextFetchOffset());
         assertEquals(1, sharePartition.cachedState().size());
         assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
@@ -10637,12 +10619,7 @@ public class SharePartitionTest {
         // 2. 15-20 (new records)
         assertEquals(1, sharePartition.cachedState().size());
         List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(10, 10, 2));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 15, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(16, 16, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(17, 17, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(18, 18, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(20, 20, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 20, 1));
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
 
         // Check cached state.
@@ -10696,8 +10673,7 @@ public class SharePartitionTest {
             FETCH_ISOLATION_HWM),
             3);
 
-        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(0, 2, 1));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(0, 2, 1).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(3, sharePartition.nextFetchOffset());
 
         // Add records from 0-9 offsets, 3-5 should be acquired and 0-2 should 
be ignored.
@@ -10712,8 +10688,7 @@ public class SharePartitionTest {
             FETCH_ISOLATION_HWM),
             3);
 
-        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(3, 3, 
1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(4, 4, 1));
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(3, 4, 1));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(5, 5, 1));
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
         assertEquals(6, sharePartition.nextFetchOffset());
@@ -10786,7 +10761,7 @@ public class SharePartitionTest {
                 FETCH_ISOLATION_HWM),
             10);
 
-        assertArrayEquals(expectedAcquiredRecords(5, 14, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(5, 14, 2).toArray(), 
acquiredRecordsList.toArray());
 
         assertEquals(15, sharePartition.nextFetchOffset());
         assertEquals(5L, sharePartition.startOffset());
@@ -10831,7 +10806,7 @@ public class SharePartitionTest {
                 FETCH_ISOLATION_HWM),
             10);
 
-        assertArrayEquals(expectedAcquiredRecords(15, 24, 2).toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(15, 24, 2).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(25, sharePartition.nextFetchOffset());
         assertEquals(15L, sharePartition.startOffset());
         assertEquals(1, sharePartition.cachedState().size());
@@ -10899,9 +10874,7 @@ public class SharePartitionTest {
             FETCH_ISOLATION_HWM),
             2);
 
-        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(1, 1, 1));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(2, 2, 1));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(1, 2, 1).toArray(), 
acquiredRecordsList.toArray());
 
         // Ack only 1 record
         ackResult = sharePartition.acknowledge(
@@ -10947,9 +10920,7 @@ public class SharePartitionTest {
             FETCH_ISOLATION_HWM),
             2);
 
-        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(10, 10, 2));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(11, 11, 2));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(10, 11, 2).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(12, sharePartition.nextFetchOffset());
 
         // Check cached state.
@@ -11046,9 +11017,7 @@ public class SharePartitionTest {
             FETCH_ISOLATION_HWM),
             2);
         // delivery count increased to 2
-        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(0, 0, 2));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(1, 1, 2));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(0, 1, 2).toArray(), 
acquiredRecordsList.toArray());
 
         // Ack offset at 1 and let the other offset to expire again.
         CompletableFuture<Void> ackResult = sharePartition.acknowledge(
@@ -11334,9 +11303,7 @@ public class SharePartitionTest {
             fetchPartitionData(records),
             FETCH_ISOLATION_HWM),
             2);
-        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(10, 10, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(11, 11, 2));
-        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(10, 11, 2).toArray(), 
acquiredRecordsList.toArray());
         assertEquals(12, sharePartition.nextFetchOffset());
 
         // Member-3 acquires records in batch_optimized mode.
@@ -11354,11 +11321,8 @@ public class SharePartitionTest {
         // 1. 12-14 (released offsets)
         // 2. 21-22 (released offsets)
         // 3. 30-44 (new offsets)
-        expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(12, 
12, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(13, 13, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(14, 14, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 2));
-        expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 2));
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(12, 14, 2));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 22, 2));
         expectedAcquiredRecord.addAll(expectedAcquiredRecord(30, 44, 1));
         assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
         assertEquals(45, sharePartition.nextFetchOffset());
@@ -11600,9 +11564,7 @@ public class SharePartitionTest {
                 FETCH_ISOLATION_HWM),
             2);
 
-        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26, 
26, 4));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(27, 27, 4));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(26, 27, 4).toArray(), 
acquiredRecordsList.toArray());
     }
 
     @Test
@@ -11653,9 +11615,7 @@ public class SharePartitionTest {
                 fetchPartitionData(records),
                 FETCH_ISOLATION_HWM),
             10);
-        final List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15, 15, 5));
-        IntStream.range(1, 10).forEach(i -> 
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15 + i, 15 + i, 5)));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(15, 24, 5).toArray(), 
acquiredRecordsList.toArray());
 
         // Allowing acquisition lock to expire.
         mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 
1);
@@ -11670,14 +11630,11 @@ public class SharePartitionTest {
                 fetchPartitionData(records),
                 FETCH_ISOLATION_HWM),
             5);
-        final List<AcquiredRecords> expectedAcquiredRecords2 = new 
ArrayList<>(expectedAcquiredRecord(15, 15, 6));
-        IntStream.range(1, 5).forEach(i -> 
expectedAcquiredRecords2.addAll(expectedAcquiredRecord(15 + i, 15 + i, 6)));
-        assertArrayEquals(expectedAcquiredRecords2.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(15, 19, 6).toArray(), 
acquiredRecordsList.toArray());
 
         // Allowing acquisition lock to expire.
         mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS + 
1);
 
-        List<AcquiredRecords> expectedLastAttemptAcquiredRecords;
         // Last delivery attempt, records are delivered individually.
         for (int i = 0; i < 5; i++) {
             acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@@ -11689,8 +11646,7 @@ public class SharePartitionTest {
                     fetchPartitionData(records),
                     FETCH_ISOLATION_HWM),
                 1);
-            expectedLastAttemptAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15 + i, 15 + i, 7));
-            assertArrayEquals(expectedLastAttemptAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+            assertArrayEquals(expectedAcquiredRecord(15 + i, 15 + i, 
7).toArray(), acquiredRecordsList.toArray());
         }
 
         // The record at offset 20 has a delivery count of 5 and is a subject 
to be throttled;
@@ -11704,9 +11660,7 @@ public class SharePartitionTest {
                 fetchPartitionData(records),
                 FETCH_ISOLATION_HWM),
             5);
-        final List<AcquiredRecords> expectedAcquiredRecords3 = new 
ArrayList<>(expectedAcquiredRecord(20, 20, 6));
-        IntStream.range(1, 5).forEach(i -> 
expectedAcquiredRecords3.addAll(expectedAcquiredRecord(20 + i, 20 + i, 6)));
-        assertArrayEquals(expectedAcquiredRecords3.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(20, 24, 6).toArray(), 
acquiredRecordsList.toArray());
 
         // The record at offset 25 has a delivery count of 4 and is a subject 
to be throttled;
         // First acquisition attempt fails: batch size should be ~1/2 of 
original, 20 -> 10
@@ -11719,9 +11673,7 @@ public class SharePartitionTest {
                 fetchPartitionData(records),
                 FETCH_ISOLATION_HWM),
             10);
-        final List<AcquiredRecords> expectedAcquiredRecords4 = new 
ArrayList<>(expectedAcquiredRecord(25, 25, 5));
-        IntStream.range(1, 10).forEach(i -> 
expectedAcquiredRecords4.addAll(expectedAcquiredRecord(25 + i, 25 + i, 5)));
-        assertArrayEquals(expectedAcquiredRecords4.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(25, 34, 5).toArray(), 
acquiredRecordsList.toArray());
     }
 
     @Test
@@ -11813,9 +11765,7 @@ public class SharePartitionTest {
                 fetchPartitionData(records),
                 FETCH_ISOLATION_HWM),
             2);
-        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(15, 15, 4));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(16, 16, 4));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(15, 16, 4).toArray(), 
acquiredRecordsList.toArray());
 
         // The record at offset 17 should ba delivered alone.
         acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@@ -11827,8 +11777,7 @@ public class SharePartitionTest {
                 fetchPartitionData(records),
                 FETCH_ISOLATION_HWM),
             1);
-        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(17, 
17, 5));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(17, 17, 5).toArray(), 
acquiredRecordsList.toArray());
     }
 
     @Test
@@ -11920,11 +11869,7 @@ public class SharePartitionTest {
                 fetchPartitionData(records),
                 FETCH_ISOLATION_HWM),
             4);
-        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(16, 16, 3));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(17, 17, 3));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(18, 18, 3));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 3));
-        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertArrayEquals(expectedAcquiredRecord(16, 19, 3).toArray(), 
acquiredRecordsList.toArray());
     }
 
     @Test
@@ -12035,8 +11980,7 @@ public class SharePartitionTest {
 
         List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(30, 34, 3));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(35, 35, 3));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 38, 3));
-        expectedAcquiredRecords.addAll(expectedAcquiredRecord(39, 39, 3));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 39, 3));
         expectedAcquiredRecords.addAll(expectedAcquiredRecord(45, 49, 2));
 
         assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());

Reply via email to