This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new 450101c19a5 KAFKA-19957: Accumulate offsets response in same batch
(#21061) (#21064)
450101c19a5 is described below
commit 450101c19a530415266fb7eb8f2e39979f14735c
Author: Apoorv Mittal <[email protected]>
AuthorDate: Wed Dec 3 15:20:25 2025 +0000
KAFKA-19957: Accumulate offsets response in same batch (#21061) (#21064)
Merges contiguous AcquiredRecords with the same delivery count into
single records.
The PR accumulates multiple offsets in a single batch only if the
offsets are part of same in-flight batch i.e. if cache has 0-4, and 5-9
and the records acquired are 3,4,5,6, with same delivery count, then
output shall be [3-4],[5-6]. This has been done to make sure that
offsets accumulation is not past the `batch size` in request. The
in-flight batches originally formed on the broker are created as per
the `batch size` from the client hence accumulating from single batch
is safe.
However, if in a same share group multiple clients request with
different batch sizes then there would be discrepancy. Having said that,
the similar problem might occur with complete batch acquisition in this
case hence that should be fixed separately, if needed.
Reviewers: Andrew Schofield <[email protected]>
---
.../java/kafka/server/share/ShareFetchUtils.java | 43 ++++++++
.../java/kafka/server/share/SharePartition.java | 9 +-
.../kafka/server/share/ShareFetchUtilsTest.java | 99 +++++++++++++++++
.../kafka/server/share/SharePartitionTest.java | 122 ++++++---------------
4 files changed, 181 insertions(+), 92 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
index ab7bee1d990..88f369b7615 100644
--- a/core/src/main/java/kafka/server/share/ShareFetchUtils.java
+++ b/core/src/main/java/kafka/server/share/ShareFetchUtils.java
@@ -274,4 +274,47 @@ public class ShareFetchUtils {
}
return defaultValue;
}
+
+ /**
+ * Merges contiguous AcquiredRecords with the same delivery count into
single records.
+ * <p>
+ * This method takes a list of AcquiredRecords where firstOffset and
lastOffset typically are the
+ * same (representing single offsets), but not necessarily required, and
merges contiguous offsets
+ * that have the same delivery count into ranges.
+ *
+ * @param result the list to accumulate merged AcquiredRecords into
+ * @param acquiredRecords the sorted list of AcquiredRecords to merge
+ */
+ static void accumulateAcquiredRecords(List<AcquiredRecords> result,
List<AcquiredRecords> acquiredRecords) {
+ if (acquiredRecords.isEmpty()) {
+ return;
+ }
+
+ long firstOffset = acquiredRecords.get(0).firstOffset();
+ long lastOffset = acquiredRecords.get(0).lastOffset();
+ short deliveryCount = acquiredRecords.get(0).deliveryCount();
+
+ for (int i = 1; i < acquiredRecords.size(); i++) {
+ AcquiredRecords current = acquiredRecords.get(i);
+ if (current.firstOffset() == lastOffset + 1 && deliveryCount ==
current.deliveryCount()) {
+ // Extend the last offset.
+ lastOffset = current.lastOffset();
+ } else {
+ // Append the current accumulated batch and start a new batch.
+ result.add(new AcquiredRecords()
+ .setFirstOffset(firstOffset)
+ .setLastOffset(lastOffset)
+ .setDeliveryCount(deliveryCount));
+ // Reset the accumulation variables to the current acquired
records.
+ firstOffset = current.firstOffset();
+ lastOffset = current.lastOffset();
+ deliveryCount = current.deliveryCount();
+ }
+ }
+ // Add the last accumulated batch.
+ result.add(new AcquiredRecords()
+ .setFirstOffset(firstOffset)
+ .setLastOffset(lastOffset)
+ .setDeliveryCount(deliveryCount));
+ }
}
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 92e75069a79..1043adb71fc 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -1921,10 +1921,11 @@ public class SharePartition {
InFlightBatch inFlightBatch,
List<AcquiredRecords> result
) {
- lock.writeLock().lock();
int acquiredCount = 0;
long maxFetchRecordsWhileThrottledRecords = -1;
boolean hasThrottledRecord = false;
+ List<AcquiredRecords> offsetAcquiredRecords = new ArrayList<>();
+ lock.writeLock().lock();
try {
for (Map.Entry<Long, InFlightState> offsetState :
inFlightBatch.offsetState().entrySet()) {
// For the first batch which might have offsets prior to the
request base
@@ -1986,8 +1987,7 @@ public class SharePartition {
// Update acquisition lock timeout task for the offset.
offsetState.getValue().updateAcquisitionLockTimeoutTask(acquisitionLockTimeoutTask);
- // TODO: Maybe we can club the continuous offsets here.
- result.add(new AcquiredRecords()
+ offsetAcquiredRecords.add(new AcquiredRecords()
.setFirstOffset(offsetState.getKey())
.setLastOffset(offsetState.getKey())
.setDeliveryCount((short)
offsetState.getValue().deliveryCount()));
@@ -2008,6 +2008,9 @@ public class SharePartition {
} finally {
lock.writeLock().unlock();
}
+
+ // Accumulate the acquired records for the offset acquired records in
the result.
+ ShareFetchUtils.accumulateAcquiredRecords(result,
offsetAcquiredRecords);
return acquiredCount;
}
diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
index a0257ac65dc..144f505a149 100644
--- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
+++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java
@@ -55,6 +55,7 @@ import org.mockito.Mockito;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -68,6 +69,7 @@ import java.util.stream.Stream;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createFileRecords;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.createShareAcquiredRecords;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -628,6 +630,103 @@ public class ShareFetchUtilsTest {
assertEquals(records, slicedRecords);
}
+ @Test
+ void testAccumulateAcquiredRecords() {
+ List<AcquiredRecords> input = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short) 2)
+ );
+
+ List<AcquiredRecords> result = new ArrayList<>();
+ ShareFetchUtils.accumulateAcquiredRecords(result, input);
+ List<AcquiredRecords> expected = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(1).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(4).setLastOffset(5).setDeliveryCount((short) 2)
+ );
+ assertArrayEquals(expected.toArray(), result.toArray());
+ }
+
+ @Test
+ void testAccumulateAcquiredRecordsAllBatches() {
+ List<AcquiredRecords> input = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(3).setLastOffset(3).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1)
+ );
+
+ List<AcquiredRecords> result = new ArrayList<>();
+ ShareFetchUtils.accumulateAcquiredRecords(result, input);
+ List<AcquiredRecords> expected = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(4).setDeliveryCount((short) 1)
+ );
+ assertArrayEquals(expected.toArray(), result.toArray());
+ }
+
+ @Test
+ void testAccumulateAcquiredRecordsWithRanges() {
+ List<AcquiredRecords> input = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(3).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(10).setLastOffset(15).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(16).setLastOffset(20).setDeliveryCount((short)
2)
+ );
+
+ List<AcquiredRecords> result = new ArrayList<>();
+ ShareFetchUtils.accumulateAcquiredRecords(result, input);
+ List<AcquiredRecords> expected = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(4).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(8).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(10).setLastOffset(20).setDeliveryCount((short)
2)
+ );
+ assertArrayEquals(expected.toArray(), result.toArray());
+ }
+
+ @Test
+ void testAccumulateAcquiredRecordsEmptyList() {
+ List<AcquiredRecords> result = new ArrayList<>();
+ ShareFetchUtils.accumulateAcquiredRecords(result, List.of());
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void testAccumulateAcquiredRecordsSingleRecord() {
+ List<AcquiredRecords> result = new ArrayList<>();
+ List<AcquiredRecords> input = List.of(
+ new
AcquiredRecords().setFirstOffset(5).setLastOffset(5).setDeliveryCount((short)
4));
+ ShareFetchUtils.accumulateAcquiredRecords(result, input);
+ assertArrayEquals(input.toArray(), result.toArray());
+ }
+
+ @Test
+ void testAccumulateAcquiredRecordsNoMerging() {
+ List<AcquiredRecords> input = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(4).setLastOffset(4).setDeliveryCount((short) 1)
+ );
+
+ List<AcquiredRecords> result = new ArrayList<>();
+ ShareFetchUtils.accumulateAcquiredRecords(result, input);
+ assertArrayEquals(input.toArray(), result.toArray());
+
+ input = List.of(
+ new
AcquiredRecords().setFirstOffset(0).setLastOffset(0).setDeliveryCount((short)
1),
+ new
AcquiredRecords().setFirstOffset(1).setLastOffset(1).setDeliveryCount((short)
2),
+ new
AcquiredRecords().setFirstOffset(2).setLastOffset(2).setDeliveryCount((short) 3)
+ );
+
+ result = new ArrayList<>();
+ ShareFetchUtils.accumulateAcquiredRecords(result, input);
+ assertArrayEquals(input.toArray(), result.toArray());
+ }
+
private static class RecordsArgumentsProvider implements ArgumentsProvider
{
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext
context) throws Exception {
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index f84f64afdb1..beae9e48d8d 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -96,7 +96,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-import java.util.stream.IntStream;
import static kafka.server.share.SharePartition.EMPTY_MEMBER_ID;
import static
org.apache.kafka.server.share.fetch.ShareFetchTestUtils.memoryRecordsBuilder;
@@ -1366,7 +1365,7 @@ public class SharePartitionTest {
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 27, 2));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
assertEquals(28, sharePartition.nextFetchOffset());
@@ -1698,7 +1697,7 @@ public class SharePartitionTest {
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 18, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(23, 25, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecords(26, 27, 2));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(26, 27, 2));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
assertEquals(28, sharePartition.nextFetchOffset());
@@ -2387,7 +2386,7 @@ public class SharePartitionTest {
// Validate 1 batch is fetched, with 5 records till end of batch, last
available batch should
// not be acquired
- assertArrayEquals(expectedAcquiredRecords(15, 19, 2).toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(15, 19, 2).toArray(),
acquiredRecordsList.toArray());
assertEquals(30, sharePartition.nextFetchOffset());
// Release last offset of the acquired batch. Only 1 record should be
released and later acquired.
@@ -3204,7 +3203,7 @@ public class SharePartitionTest {
// Send the same fetch request batch again but only 2 offsets should
come as acquired.
acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 2);
- assertArrayEquals(expectedAcquiredRecords(12, 13, 2).toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(12, 13, 2).toArray(),
acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
assertEquals(0, sharePartition.deliveryCompleteCount());
}
@@ -3288,7 +3287,7 @@ public class SharePartitionTest {
// Send next batch from offset 12, only 3 records should be acquired.
acquiredRecordsList = fetchAcquiredRecords(sharePartition, records1,
3);
- assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(12, 14, 2).toArray(),
acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
// Though record2 batch exists to acquire but send batch record3, it
should be acquired but
@@ -3302,7 +3301,7 @@ public class SharePartitionTest {
MemoryRecords subsetRecords = memoryRecords(17, 2);
acquiredRecordsList = fetchAcquiredRecords(sharePartition,
subsetRecords, 2);
- assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(17, 18, 2).toArray(),
acquiredRecordsList.toArray());
// Next fetch offset should not move.
assertEquals(15, sharePartition.nextFetchOffset());
@@ -3320,7 +3319,7 @@ public class SharePartitionTest {
acquiredRecordsList = fetchAcquiredRecords(sharePartition, records2,
3);
// Offset 15,16 and 19 should be acquired.
- List<AcquiredRecords> expectedAcquiredRecords =
expectedAcquiredRecords(15, 16, 2);
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(15, 16, 2));
expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
// Next fetch offset should not move.
@@ -3441,11 +3440,7 @@ public class SharePartitionTest {
// The gap from 11 to 20 will be acquired. Since the next batch is
AVAILABLE, and we records fetched from replica manager
// overlap with the next batch, some records from the next batch will
also be acquired
List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 3));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 3));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 3));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 3));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 25, 3));
assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
@@ -3546,11 +3541,7 @@ public class SharePartitionTest {
expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 80, 1));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 81, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(82, 82, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(83, 83, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(84, 84, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(85, 85, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 85, 2));
assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
@@ -5773,11 +5764,8 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM), 15);
- List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(16, 16, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(17, 17, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(18, 18, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(19, 19, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(20, 20, 2));
+ // As divided in 2 batches hence merge shall not happen here for all
batches.
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(16, 20, 2));
expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 2));
assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
@@ -10447,13 +10435,7 @@ public class SharePartitionTest {
5);
// Should acquire the subset of records in InflightBatch which are
still available.
- List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(5, 5, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(6, 6, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(7, 7, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(8, 8, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(9, 9, 1));
-
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(5, 9, 1).toArray(),
acquiredRecordsList.toArray());
assertEquals(10, sharePartition.nextFetchOffset());
assertEquals(1, sharePartition.cachedState().size());
assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
@@ -10637,12 +10619,7 @@ public class SharePartitionTest {
// 2. 15-20 (new records)
assertEquals(1, sharePartition.cachedState().size());
List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(10, 10, 2));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 15, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(16, 16, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(17, 17, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(18, 18, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(20, 20, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(15, 20, 1));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
// Check cached state.
@@ -10696,8 +10673,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
3);
- List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(0, 2, 1));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(0, 2, 1).toArray(),
acquiredRecordsList.toArray());
assertEquals(3, sharePartition.nextFetchOffset());
// Add records from 0-9 offsets, 3-5 should be acquired and 0-2 should
be ignored.
@@ -10712,8 +10688,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
3);
- expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(3, 3,
1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(4, 4, 1));
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(3, 4, 1));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(5, 5, 1));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
assertEquals(6, sharePartition.nextFetchOffset());
@@ -10786,7 +10761,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
10);
- assertArrayEquals(expectedAcquiredRecords(5, 14, 2).toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(5, 14, 2).toArray(),
acquiredRecordsList.toArray());
assertEquals(15, sharePartition.nextFetchOffset());
assertEquals(5L, sharePartition.startOffset());
@@ -10831,7 +10806,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
10);
- assertArrayEquals(expectedAcquiredRecords(15, 24, 2).toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(15, 24, 2).toArray(),
acquiredRecordsList.toArray());
assertEquals(25, sharePartition.nextFetchOffset());
assertEquals(15L, sharePartition.startOffset());
assertEquals(1, sharePartition.cachedState().size());
@@ -10899,9 +10874,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
2);
- List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(1, 1, 1));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(2, 2, 1));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(1, 2, 1).toArray(),
acquiredRecordsList.toArray());
// Ack only 1 record
ackResult = sharePartition.acknowledge(
@@ -10947,9 +10920,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
2);
- List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(10, 10, 2));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(11, 11, 2));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(10, 11, 2).toArray(),
acquiredRecordsList.toArray());
assertEquals(12, sharePartition.nextFetchOffset());
// Check cached state.
@@ -11046,9 +11017,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
2);
// delivery count increased to 2
- List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(0, 0, 2));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(1, 1, 2));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(0, 1, 2).toArray(),
acquiredRecordsList.toArray());
// Ack offset at 1 and let the other offset to expire again.
CompletableFuture<Void> ackResult = sharePartition.acknowledge(
@@ -11334,9 +11303,7 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
2);
- List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(10, 10, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(11, 11, 2));
- assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(10, 11, 2).toArray(),
acquiredRecordsList.toArray());
assertEquals(12, sharePartition.nextFetchOffset());
// Member-3 acquires records in batch_optimized mode.
@@ -11354,11 +11321,8 @@ public class SharePartitionTest {
// 1. 12-14 (released offsets)
// 2. 21-22 (released offsets)
// 3. 30-44 (new offsets)
- expectedAcquiredRecord = new ArrayList<>(expectedAcquiredRecord(12,
12, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(13, 13, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(14, 14, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 2));
- expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 2));
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(12, 14, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 22, 2));
expectedAcquiredRecord.addAll(expectedAcquiredRecord(30, 44, 1));
assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
assertEquals(45, sharePartition.nextFetchOffset());
@@ -11600,9 +11564,7 @@ public class SharePartitionTest {
FETCH_ISOLATION_HWM),
2);
- expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(26,
26, 4));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(27, 27, 4));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(26, 27, 4).toArray(),
acquiredRecordsList.toArray());
}
@Test
@@ -11653,9 +11615,7 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
10);
- final List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(15, 15, 5));
- IntStream.range(1, 10).forEach(i ->
expectedAcquiredRecords.addAll(expectedAcquiredRecord(15 + i, 15 + i, 5)));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(15, 24, 5).toArray(),
acquiredRecordsList.toArray());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS +
1);
@@ -11670,14 +11630,11 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
5);
- final List<AcquiredRecords> expectedAcquiredRecords2 = new
ArrayList<>(expectedAcquiredRecord(15, 15, 6));
- IntStream.range(1, 5).forEach(i ->
expectedAcquiredRecords2.addAll(expectedAcquiredRecord(15 + i, 15 + i, 6)));
- assertArrayEquals(expectedAcquiredRecords2.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(15, 19, 6).toArray(),
acquiredRecordsList.toArray());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS +
1);
- List<AcquiredRecords> expectedLastAttemptAcquiredRecords;
// Last delivery attempt, records are delivered individually.
for (int i = 0; i < 5; i++) {
acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@@ -11689,8 +11646,7 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
1);
- expectedLastAttemptAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(15 + i, 15 + i, 7));
- assertArrayEquals(expectedLastAttemptAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(15 + i, 15 + i,
7).toArray(), acquiredRecordsList.toArray());
}
// The record at offset 20 has a delivery count of 5 and is a subject
to be throttled;
@@ -11704,9 +11660,7 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
5);
- final List<AcquiredRecords> expectedAcquiredRecords3 = new
ArrayList<>(expectedAcquiredRecord(20, 20, 6));
- IntStream.range(1, 5).forEach(i ->
expectedAcquiredRecords3.addAll(expectedAcquiredRecord(20 + i, 20 + i, 6)));
- assertArrayEquals(expectedAcquiredRecords3.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(20, 24, 6).toArray(),
acquiredRecordsList.toArray());
// The record at offset 25 has a delivery count of 4 and is a subject
to be throttled;
// First acquisition attempt fails: batch size should be ~1/2 of
original, 20 -> 10
@@ -11719,9 +11673,7 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
10);
- final List<AcquiredRecords> expectedAcquiredRecords4 = new
ArrayList<>(expectedAcquiredRecord(25, 25, 5));
- IntStream.range(1, 10).forEach(i ->
expectedAcquiredRecords4.addAll(expectedAcquiredRecord(25 + i, 25 + i, 5)));
- assertArrayEquals(expectedAcquiredRecords4.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(25, 34, 5).toArray(),
acquiredRecordsList.toArray());
}
@Test
@@ -11813,9 +11765,7 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
2);
- List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(15, 15, 4));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(16, 16, 4));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(15, 16, 4).toArray(),
acquiredRecordsList.toArray());
// The record at offset 17 should ba delivered alone.
acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
@@ -11827,8 +11777,7 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
1);
- expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(17,
17, 5));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(17, 17, 5).toArray(),
acquiredRecordsList.toArray());
}
@Test
@@ -11920,11 +11869,7 @@ public class SharePartitionTest {
fetchPartitionData(records),
FETCH_ISOLATION_HWM),
4);
- List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(16, 16, 3));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(17, 17, 3));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(18, 18, 3));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(19, 19, 3));
- assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertArrayEquals(expectedAcquiredRecord(16, 19, 3).toArray(),
acquiredRecordsList.toArray());
}
@Test
@@ -12035,8 +11980,7 @@ public class SharePartitionTest {
List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(30, 34, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(35, 35, 3));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 38, 3));
- expectedAcquiredRecords.addAll(expectedAcquiredRecord(39, 39, 3));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(38, 39, 3));
expectedAcquiredRecords.addAll(expectedAcquiredRecord(45, 49, 2));
assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());