adixitconfluent commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2460670210
##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -8801,6 +8838,543 @@ public void
testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep
assertEquals(2,
sharePartition.cachedState().get(7L).batchDeliveryCount());
}
+ @Test
+ public void testAcquireSingleBatchInRecordLimitMode() {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withPersister(persister)
+ .build());
+
Mockito.doReturn(true).when(sharePartition).isRecordLimitMode(Mockito.any());
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ // Member-1 attempts to acquire records in strict mode with a maximum
fetch limit of 5 records.
+ MemoryRecords records = memoryRecords(10);
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ SHARE_ACQUIRE_MODE,
+ 2,
+ 5,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 5);
+
+ assertArrayEquals(expectedAcquiredRecord(0, 4, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(5, sharePartition.nextFetchOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+ assertEquals(9, sharePartition.cachedState().get(0L).lastOffset());
+ assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(0L).batchState());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(0L).batchMemberId());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(0L).batchDeliveryCount());
+
+ assertEquals(10,
sharePartition.cachedState().get(0L).offsetState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(0L).offsetState().get(0L).memberId());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(4L).state());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(0L).offsetState().get(0L).memberId());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(5L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(9L).state());
+
+
+ // Acquire the same batch with member-2. 5 records will be acquired.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ "member-2",
+ SHARE_ACQUIRE_MODE,
+ 2,
+ 5,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 5);
+
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(5, 5, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(6, 6, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(7, 7, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(8, 8, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(9, 9, 1));
+
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertEquals(10, sharePartition.nextFetchOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+ assertEquals(9, sharePartition.cachedState().get(0L).lastOffset());
+
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(5L).state());
+ assertEquals("member-2",
sharePartition.cachedState().get(0L).offsetState().get(5L).memberId());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(9L).state());
+ assertEquals("member-2",
sharePartition.cachedState().get(0L).offsetState().get(5L).memberId());
+ }
+
+ @Test
+ public void testAcquireMultipleBatchesInRecordLimitMode() throws
InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withPersister(persister)
+ .build());
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ // Create 3 batches of records.
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 10, 5).close();
+ memoryRecordsBuilder(buffer, 15, 15).close();
+ memoryRecordsBuilder(buffer, 30, 15).close();
+
+ buffer.flip();
+
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ // Acquire 10 records.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ SHARE_ACQUIRE_MODE.RECORD_LIMIT,
+ BATCH_SIZE,
+ 10,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records, 10),
+ FETCH_ISOLATION_HWM),
+ 10);
+
+ assertArrayEquals(expectedAcquiredRecord(10, 19, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(20, sharePartition.nextFetchOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+ assertEquals(29, sharePartition.cachedState().get(10L).lastOffset());
+ assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(10L).batchState());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(10L).batchMemberId());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(10L).batchDeliveryCount());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).offsetState().get(19L).state());
+ assertEquals(MEMBER_ID,
sharePartition.cachedState().get(10L).offsetState().get(19L).memberId());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(10L).offsetState().get(20L).state());
+ }
+
+ @Test
+ public void
testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() {
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+ MemoryRecords records = memoryRecords(5);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ SHARE_ACQUIRE_MODE.RECORD_LIMIT,
+ BATCH_SIZE,
+ 3,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records, 0),
+ FETCH_ISOLATION_HWM),
+ 3);
+
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(0, 2, 1));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertEquals(3, sharePartition.nextFetchOffset());
+
+ // Add records from 0-9 offsets, 3-5 should be acquired and 0-2 should
be ignored.
+ records = memoryRecords(10);
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ SHARE_ACQUIRE_MODE.RECORD_LIMIT,
+ BATCH_SIZE,
+ 3,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records, 0),
+ FETCH_ISOLATION_HWM),
+ 3);
+
+ expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(3, 3,
1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(4, 4, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(5, 5, 1));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+ assertEquals(6, sharePartition.nextFetchOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+
+ // Check cached state.
+ Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+ expectedOffsetStateMap.put(0L, new InFlightState(RecordState.ACQUIRED,
(short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(1L, new InFlightState(RecordState.ACQUIRED,
(short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(2L, new InFlightState(RecordState.ACQUIRED,
(short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(3L, new InFlightState(RecordState.ACQUIRED,
(short) 1, MEMBER_ID));
+ expectedOffsetStateMap.put(4L, new InFlightState(RecordState.ACQUIRED,
(short) 1, MEMBER_ID));
+ assertEquals(expectedOffsetStateMap,
sharePartition.cachedState().get(0L).offsetState());
+ }
+
+ @Test
+ public void testAcknowledgeInRecordLimitMode() {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withPersister(persister)
+ .build());
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ MemoryRecords records = memoryRecords(10);
+ // Acquire 1 record.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ 2,
+ 1,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 1);
+
+ assertArrayEquals(expectedAcquiredRecord(0, 0, 1).toArray(),
acquiredRecordsList.toArray());
+ assertEquals(1, sharePartition.nextFetchOffset());
+
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(0, 0, List.of((byte) 1))));
+
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+ assertEquals(1, sharePartition.nextFetchOffset());
+ assertEquals(1, sharePartition.startOffset());
+ assertEquals(9, sharePartition.endOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+
+ // Acquire 2 records.
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ 2,
+ 2,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records),
+ FETCH_ISOLATION_HWM),
+ 2);
+
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(1, 1, 1));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(2, 2, 1));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+
+ // Ack only 1 record
+ ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(1, 1, List.of((byte) 1))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+ assertEquals(3, sharePartition.nextFetchOffset());
+ assertEquals(2, sharePartition.startOffset());
+ assertEquals(9, sharePartition.endOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+ assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(3L).state());
+ }
+
+ @Test
+ public void testAcquisitionLockTimeoutInRecordLimitMode() throws
InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition =
Mockito.spy(SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withPersister(persister)
+ .build());
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ // Create 3 batches of records.
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 0, 5).close();
+ memoryRecordsBuilder(buffer, 5, 15).close();
+
+ buffer.flip();
+
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ // Acquire 3 records.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ BATCH_SIZE,
+ 2,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records, 10),
+ FETCH_ISOLATION_HWM),
+ 2);
+
+ assertArrayEquals(expectedAcquiredRecord(0, 1, 1).toArray(),
acquiredRecordsList.toArray());
+ assertThrows(IllegalStateException.class, () ->
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+ // There should be 2 timer tasks for 2 offsets.
+ assertEquals(2, sharePartition.timer().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask().hasExpired());
+
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask().hasExpired());
+
+ // Allowing acquisition lock to expire.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () ->
sharePartition.cachedState().get(0L).offsetState().get(0L).state() ==
RecordState.AVAILABLE &&
+
sharePartition.cachedState().get(0L).offsetState().get(1L).state() ==
RecordState.AVAILABLE &&
+
sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()
== null &&
+
sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask()
== null &&
+ sharePartition.timer().size() == 0,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of(0L, 1L))));
+ // Acquisition lock timeout task has run already and next fetch offset
is moved to 0.
+ assertEquals(0, sharePartition.nextFetchOffset());
+ assertEquals(1,
sharePartition.cachedState().get(0L).offsetState().get(0L).deliveryCount());
+ assertEquals(1,
sharePartition.cachedState().get(0L).offsetState().get(1L).deliveryCount());
+
+ acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ BATCH_SIZE,
+ 2,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records, 10),
+ FETCH_ISOLATION_HWM),
+ 2);
+ // delivery count increased to 2
+ List<AcquiredRecords> expectedAcquiredRecords = new
ArrayList<>(expectedAcquiredRecord(0, 0, 2));
+ expectedAcquiredRecords.addAll(expectedAcquiredRecord(1, 1, 2));
+ assertArrayEquals(expectedAcquiredRecords.toArray(),
acquiredRecordsList.toArray());
+
+ // Ack offset at 1 and let the other offset to expire again.
+ CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+ MEMBER_ID,
+ List.of(new ShareAcknowledgementBatch(1, 1, List.of((byte) 2))));
+ assertNull(ackResult.join());
+ assertFalse(ackResult.isCompletedExceptionally());
+
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () ->
sharePartition.cachedState().get(0L).offsetState().get(0L).state() ==
RecordState.AVAILABLE &&
+
sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()
== null &&
+ sharePartition.timer().size() == 0,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of(0L, 1L))));
+
+
+ assertEquals(0, sharePartition.nextFetchOffset());
+ }
+
+ @Test
+ public void
testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInRecordLimitMode()
{
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Creating 2 batches starting from 16, such that there is a natural
gap from 11 to 15
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 16, 20).close();
+ memoryRecordsBuilder(buffer, 36, 25).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ // Acquire 20 records.
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ BATCH_SIZE,
+ 20,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records, 0),
+ FETCH_ISOLATION_HWM),
+ 20);
+
+ // Acquired batches will contain the following ->
+ // 1. 16-20 (gap offsets)
+ // 2. 31-40 (gap offsets)
+ // 3. 51-55 (new offsets)
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(16, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 55, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(16, sharePartition.startOffset());
+ assertEquals(60, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(56, sharePartition.nextFetchOffset());
+
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNull(persisterReadResultGapWindow);
+ }
+
+ @Test
+ public void
testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRecordLimitMode()
{
+ Persister persister = Mockito.mock(Persister.class);
+ ReadShareGroupStateResult readShareGroupStateResult =
Mockito.mock(ReadShareGroupStateResult.class);
+
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionAllData(0, 3, 11L,
Errors.NONE.code(), Errors.NONE.message(),
+ List.of(
+ new PersisterStateBatch(21L, 30L,
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+ new PersisterStateBatch(41L, 50L,
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+ ))))));
+
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+ SharePartition sharePartition =
SharePartitionBuilder.builder().withPersister(persister).build();
+
+ sharePartition.maybeInitialize();
+
+ // Creating 3 batches starting from 11, such that there is a natural
gap from 26 to 30
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 11, 10).close();
+ memoryRecordsBuilder(buffer, 21, 15).close();
+ memoryRecordsBuilder(buffer, 41, 20).close();
+ buffer.flip();
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+ List<AcquiredRecords> acquiredRecordsList =
fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ ShareAcquireMode.RECORD_LIMIT,
+ BATCH_SIZE,
+ 15,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records, 0),
+ FETCH_ISOLATION_HWM),
+ 15);
+
+ // Acquired batches will contain the following ->
+ // 1. 11-20 (gap offsets)
+ // 2. 31-35 (gap offsets)
+ List<AcquiredRecords> expectedAcquiredRecord = new
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+ expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 35, 1));
+ assertArrayEquals(expectedAcquiredRecord.toArray(),
acquiredRecordsList.toArray());
+
+ assertEquals(SharePartitionState.ACTIVE,
sharePartition.partitionState());
+ assertFalse(sharePartition.cachedState().isEmpty());
+ assertEquals(11, sharePartition.startOffset());
+ assertEquals(50, sharePartition.endOffset());
+ assertEquals(3, sharePartition.stateEpoch());
+ assertEquals(36, sharePartition.nextFetchOffset());
+
+ GapWindow persisterReadResultGapWindow =
sharePartition.persisterReadResultGapWindow();
+ assertNotNull(persisterReadResultGapWindow);
+ // Gap still exists from 36 to 50
Review Comment:
comment should be `Gap still exists from 36 to 40`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]