chirag-wadhwa5 commented on code in PR #18696:
URL: https://github.com/apache/kafka/pull/18696#discussion_r1940953629
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -1951,164 +2112,1102 @@ public void
testAcquireReleasedRecordMultipleBatches() {
// Fourth fetch request with 5 records starting from offset 28.
MemoryRecords records4 = memoryRecords(5, 28);
- List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
- MEMBER_ID,
- BATCH_SIZE,
- MAX_FETCH_RECORDS,
- new FetchPartitionData(Errors.NONE, 40, 3, records1,
- Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
- 5);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 40, 3, records1,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records1, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 30, 3, records2,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records2, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(20, sharePartition.nextFetchOffset());
+
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 30, 3, records3,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records3, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(28, sharePartition.nextFetchOffset());
+
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 30, 3, records4,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records4, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(33, sharePartition.nextFetchOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(15L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(23L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(28L).batchState());
+ assertNull(sharePartition.cachedState().get(10L).offsetState());
+ assertNull(sharePartition.cachedState().get(15L).offsetState());
+ assertNull(sharePartition.cachedState().get(23L).offsetState());
+ assertNull(sharePartition.cachedState().get(28L).offsetState());
+
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ Collections.singletonList(new ShareAcknowledgementBatch(12, 30,
Collections.singletonList((byte) 2))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ assertEquals(12, sharePartition.nextFetchOffset());
+ assertEquals(4, sharePartition.cachedState().size());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(10L).batchState());
+ assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(15L).batchState());
+ assertNull(sharePartition.cachedState().get(15L).offsetState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(15L).batchMemberId());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(23L).batchState());
+ assertNull(sharePartition.cachedState().get(23L).offsetState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(23L).batchMemberId());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(28L).batchState());
+ assertNotNull(sharePartition.cachedState().get(28L).offsetState());
+
+ Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(10L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(11L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(12L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(13L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(14L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(10L).offsetState());
+
+ expectedOffsetStateMap.clear();
+ expectedOffsetStateMap.put(28L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(29L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(30L, new
InFlightState(RecordState.AVAILABLE, (short) 1, EMPTY_MEMBER_ID));
+ expectedOffsetStateMap.put(31L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(32L, new
InFlightState(RecordState.ACQUIRED, (short) 1, MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(28L).offsetState());
+
+ // Send next batch from offset 12, only 3 records should be acquired.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 40, 3, records1,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 3);
+
+ assertArrayEquals(expectedAcquiredRecords(12, 14, 2).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ // Though record2 batch exists to acquire but send batch record3, it
should be acquired but
+ // next fetch offset should not move.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 40, 3, records3,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecords(records3, 2).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ // Acquire partial records from batch 2.
+ MemoryRecords subsetRecords = memoryRecords(2, 17);
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 2);
+
+ assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(),
acquiredRecordsList.toArray());
+ // Next fetch offset should not move.
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ // Acquire partial records from record 4 to further test if the next
fetch offset move
+ // accordingly once complete record 2 is also acquired.
+ subsetRecords = memoryRecords(1, 28);
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 20, 3, subsetRecords,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 1);
+
+ assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(),
acquiredRecordsList.toArray());
+ // Next fetch offset should not move.
+ assertEquals(15, sharePartition.nextFetchOffset());
+
+ // Try to acquire complete record 2 though it's already partially
acquired, the next fetch
+ // offset should move.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 20, 3, records2,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 3);
+
+ // Offset 15,16 and 19 should be acquired.
+ List<AcquiredRecords> expectedAcquiredRecords =
expectedAcquiredRecords(15, 16, 2);
+ expectedAcquiredRecords.addAll(expectedAcquiredRecords(19, 19, 2));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ // Next fetch offset should not move.
+ assertEquals(29, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void testAcquireGapAtBeginningAndRecordsFetchedFromGap() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ // All records fetched are part of the gap. The gap is from 11 to 20,
fetched offsets are 11 to 15.
+ MemoryRecords records = memoryRecords(5, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecord(11, 15, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(16, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(16, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightBatches() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ // Fetched offsets overlap the inFlight batches. The gap is from 11 to
20, but fetched records are from 11 to 25.
+ MemoryRecords records = memoryRecords(15, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(41, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(21, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightAvailableBatches() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ // Fetched offsets overlap the inFlight batches. The gap is from 11 to
20, but fetched records are from 11 to 25.
+ MemoryRecords records = memoryRecords(15, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 15);
+
+ // The gap from 11 to 20 will be acquired. Since the next batch is
AVAILABLE, and we records fetched from replica manager
+ // overlap with the next batch, some records from the next batch will
also be acquired
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 21, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(22, 22, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(23, 23, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(24, 24, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(25, 25, 3));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(26, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(26, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireWhenCachedStateContainsGapsAndRecordsFetchedFromNonGapOffset() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(11L, 20L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ // Fetched records are part of inFlightBatch 11-20 with state
AVAILABLE. Fetched offsets also overlap the
+ // inFlight batches. The gap is from 11 to 20, but fetched records are
from 11 to 25.
+ MemoryRecords records = memoryRecords(15, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 15);
+
+ // 2 different batches will be acquired this time (11-20 and 21-25).
The first batch will have delivery count 3
+ // as previous deliveryCount was 2. The second batch will have
delivery count 1 as it is acquired for the first time.
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 3));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 25, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(26, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(26, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireGapAtBeginningAndFetchedRecordsOverlapMultipleInFlightBatches() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40
+ new PersisterStateBatch(61L, 70L,
RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60
+ new PersisterStateBatch(81L, 90L,
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(90, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ MemoryRecords records = memoryRecords(75, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 55);
+
+ // Acquired batches will contain the following ->
+ // 1. 11-20 (gap offsets)
+ // 2. 31-40 (gap offsets)
+ // 3. 41-50 (AVAILABLE batch in cachedState)
+ // 4. 51-60 (gap offsets)
+ // 5. 71-80 (gap offsets)
+ // 6. 81-85 (AVAILABLE batch in cachedState). These will be acquired
as separate batches because we are breaking a batch in the cachedState
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 80, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(81, 81, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(82, 82, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(83, 83, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(84, 84, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(85, 85, 2));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(90, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(86, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(86, initialReadGapOffset.gapStartOffset());
+ assertEquals(90, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testAcquireGapAtBeginningAndFetchedRecordsEndJustBeforeGap() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.ACKNOWLEDGED.id, (short) 1), // There is a gap from 31 to 40
+ new PersisterStateBatch(61L, 70L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 51 to 60
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(70, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ MemoryRecords records = memoryRecords(20, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 20);
+
+ // Acquired batches will contain the following ->
+ // 1. 11-20 (gap offsets)
+ // 2. 21-30 (AVAILABLE batch in cachedState)
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(21, 30, 3));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(70, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(31, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(31, initialReadGapOffset.gapStartOffset());
+ assertEquals(70, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireGapAtBeginningAndFetchedRecordsIncludeGapOffsetsAtEnd() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.AVAILABLE.id, (short) 1), // There is a gap from 31 to 40
+ new PersisterStateBatch(61L, 70L,
RecordState.ARCHIVED.id, (short) 1), // There is a gap from 51 to 60
+ new PersisterStateBatch(81L, 90L,
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 71 to 80
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(90, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ MemoryRecords records = memoryRecords(65, 11);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ MAX_FETCH_RECORDS,
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 45);
+
+ // Acquired batches will contain the following ->
+ // 1. 11-20 (gap offsets)
+ // 2. 31-40 (gap offsets)
+ // 3. 41-50 (AVAILABLE batch in cachedState)
+ // 4. 51-60 (gap offsets)
+ // 5. 71-75 (gap offsets). The gap is from 71 to 80, but the fetched
records end at 75. These gap offsets will be acquired as a single batch
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(41, 50, 2));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 60, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(71, 75, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(90, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(76, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ // After records are acquired, the initialReadGapOffset should be
updated
+ assertEquals(76, initialReadGapOffset.gapStartOffset());
+ assertEquals(90, initialReadGapOffset.endOffset());
+ }
+
+
+ @Test
+ public void
testAcquireWhenRecordsFetchedFromGapAndMaxFetchRecordsIsExceeded() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(11L, 20L,
RecordState.ACKNOWLEDGED.id, (short) 2),
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 21-30
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ // The start offset will be moved to 21, since the offsets 11 to 20
are acknowledged, and will be removed
+ // from cached state in the maybeUpdateCachedStateAndOffsets method
+ assertEquals(21, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(21, sharePartition.nextFetchOffset());
+
+ // Creating 3 batches of records with a total of 8 records
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 3, 21).close();
+ memoryRecordsBuilder(buffer, 3, 24).close();
+ memoryRecordsBuilder(buffer, 2, 27).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ 6, // maxFetchRecords is less than the number of records
fetched
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 6);
+
+ // Since max fetch records (6) is less than the number of records
fetched (8), only 6 records will be acquired
+ assertArrayEquals(expectedAcquiredRecord(21, 26, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(21, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(27, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(27, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testAcquireMaxFetchRecordsExceededAfterAcquiringGaps() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(21L, 30L,
RecordState.AVAILABLE.id, (short) 2), // There is a gap from 11-20
+ new PersisterStateBatch(31L, 40L,
RecordState.ARCHIVED.id, (short) 1)
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ // Creating 3 batches of records with a total of 8 records
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 10, 11).close();
+ memoryRecordsBuilder(buffer, 10, 21).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ 8, // maxFetchRecords is less than the number of records
fetched
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecord(11, 20, 1).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(21, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(21, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void testAcquireMaxFetchRecordsExceededBeforeAcquiringGaps() {
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ Arrays.asList(
+ new PersisterStateBatch(11L, 20L,
RecordState.AVAILABLE.id, (short) 2),
+ new PersisterStateBatch(31L, 40L,
RecordState.AVAILABLE.id, (short) 1) // There is a gap from 21-30
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ CompletableFuture<Void> result = sharePartition.maybeInitialize();
+ assertTrue(result.isDone());
+ assertFalse(result.isCompletedExceptionally());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(11, sharePartition.nextFetchOffset());
+
+ // Creating 3 batches of records with a total of 8 records
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 10, 11).close();
+ memoryRecordsBuilder(buffer, 20, 21).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ BATCH_SIZE,
+ 8, // maxFetchRecords is less than the number of records
fetched
+ new FetchPartitionData(Errors.NONE, 3, 0, records,
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
OptionalInt.empty(), false)),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecord(11, 20, 3).toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(40, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(21, sharePartition.nextFetchOffset());
+
+ SharePartition.InitialReadGapOffset initialReadGapOffset =
sharePartition.initialReadGapOffset();
+ assertNotNull(initialReadGapOffset);
+
+ assertEquals(21, initialReadGapOffset.gapStartOffset());
+ assertEquals(40, initialReadGapOffset.endOffset());
+ }
+
+ @Test
+ public void
testAcquireWhenRecordsFetchedFromGapAndPartitionContainsNaturalGaps() {
Review Comment:
The gaps actually correspond with the gap in the cached state (21 to 29).
The memory records created have the following batches ->
(10, 20)
(30, 50)
The significance of this test lies in the fact that the natural gap is in
between the batches fetched from the partition, but also coincide with the gap
in cached state. In this case, the broker acquire the gap because the broker
does not parse all the batches in the fetched record, so it's not aware of the
presence of any natural gaps. The broker only knows the range of offsets
fetched (from 10 to 50), and will assume all these offsets contain information.
It is then the client's responsibility to inform the broker about the natural
gap in the batch of 21-29. I will add some comments in the test as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]