adixitconfluent commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2460670210


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -8801,6 +8838,543 @@ public void 
testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep
         assertEquals(2, 
sharePartition.cachedState().get(7L).batchDeliveryCount());
     }
 
+    @Test
+    public void testAcquireSingleBatchInRecordLimitMode() {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withPersister(persister)
+            .build());
+        
Mockito.doReturn(true).when(sharePartition).isRecordLimitMode(Mockito.any());
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        // Member-1 attempts to acquire records in strict mode with a maximum 
fetch limit of 5 records.
+        MemoryRecords records = memoryRecords(10);
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            SHARE_ACQUIRE_MODE,
+            2,
+            5,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM),
+            5);
+
+        assertArrayEquals(expectedAcquiredRecord(0, 4, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(5, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+        assertEquals(9, sharePartition.cachedState().get(0L).lastOffset());
+        assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(0L).batchState());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(0L).batchMemberId());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(0L).batchDeliveryCount());
+
+        assertEquals(10, 
sharePartition.cachedState().get(0L).offsetState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(0L).offsetState().get(0L).memberId());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(4L).state());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(0L).offsetState().get(0L).memberId());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(5L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(9L).state());
+
+
+        // Acquire the same batch with member-2. 5 records will be acquired.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            "member-2",
+            SHARE_ACQUIRE_MODE,
+            2,
+            5,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM),
+            5);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(5, 5, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(6, 6, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(7, 7, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(8, 8, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(9, 9, 1));
+
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(10, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(0, sharePartition.cachedState().get(0L).firstOffset());
+        assertEquals(9, sharePartition.cachedState().get(0L).lastOffset());
+
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(5L).state());
+        assertEquals("member-2", 
sharePartition.cachedState().get(0L).offsetState().get(5L).memberId());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(9L).state());
+        assertEquals("member-2", 
sharePartition.cachedState().get(0L).offsetState().get(5L).memberId());
+    }
+
+    @Test
+    public void testAcquireMultipleBatchesInRecordLimitMode() throws 
InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withPersister(persister)
+            .build());
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        // Create 3 batches of records.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 10, 5).close();
+        memoryRecordsBuilder(buffer, 15, 15).close();
+        memoryRecordsBuilder(buffer, 30, 15).close();
+
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Acquire 10 records.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            SHARE_ACQUIRE_MODE.RECORD_LIMIT,
+            BATCH_SIZE,
+            10,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records, 10),
+            FETCH_ISOLATION_HWM),
+            10);
+
+        assertArrayEquals(expectedAcquiredRecord(10, 19, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(20, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(10, sharePartition.cachedState().get(10L).firstOffset());
+        assertEquals(29, sharePartition.cachedState().get(10L).lastOffset());
+        assertNotNull(sharePartition.cachedState().get(10L).offsetState());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(10L).batchState());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(10L).batchMemberId());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(10L).batchDeliveryCount());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).offsetState().get(19L).state());
+        assertEquals(MEMBER_ID, 
sharePartition.cachedState().get(10L).offsetState().get(19L).memberId());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(10L).offsetState().get(20L).state());
+    }
+
+    @Test
+    public void 
testAcquireMultipleRecordsWithOverlapAndNewBatchInRecordLimitMode() {
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();
+        MemoryRecords records = memoryRecords(5);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            SHARE_ACQUIRE_MODE.RECORD_LIMIT,
+            BATCH_SIZE,
+            3,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records, 0),
+            FETCH_ISOLATION_HWM),
+            3);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(0, 2, 1));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(3, sharePartition.nextFetchOffset());
+
+        // Add records from 0-9 offsets, 3-5 should be acquired and 0-2 should 
be ignored.
+        records = memoryRecords(10);
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            SHARE_ACQUIRE_MODE.RECORD_LIMIT,
+            BATCH_SIZE,
+            3,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records, 0),
+            FETCH_ISOLATION_HWM),
+            3);
+
+        expectedAcquiredRecords = new ArrayList<>(expectedAcquiredRecord(3, 3, 
1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(4, 4, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(5, 5, 1));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(6, sharePartition.nextFetchOffset());
+        assertEquals(2, sharePartition.cachedState().size());
+
+        // Check cached state.
+        Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
+        expectedOffsetStateMap.put(0L, new InFlightState(RecordState.ACQUIRED, 
(short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(1L, new InFlightState(RecordState.ACQUIRED, 
(short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(2L, new InFlightState(RecordState.ACQUIRED, 
(short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(3L, new InFlightState(RecordState.ACQUIRED, 
(short) 1, MEMBER_ID));
+        expectedOffsetStateMap.put(4L, new InFlightState(RecordState.ACQUIRED, 
(short) 1, MEMBER_ID));
+        assertEquals(expectedOffsetStateMap, 
sharePartition.cachedState().get(0L).offsetState());
+    }
+
+    @Test
+    public void testAcknowledgeInRecordLimitMode() {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withPersister(persister)
+            .build());
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        MemoryRecords records = memoryRecords(10);
+        // Acquire 1 record.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            ShareAcquireMode.RECORD_LIMIT,
+            2,
+            1,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM),
+            1);
+
+        assertArrayEquals(expectedAcquiredRecord(0, 0, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertEquals(1, sharePartition.nextFetchOffset());
+
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(0, 0, List.of((byte) 1))));
+
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+        assertEquals(1, sharePartition.nextFetchOffset());
+        assertEquals(1, sharePartition.startOffset());
+        assertEquals(9, sharePartition.endOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+
+        // Acquire 2 records.
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            ShareAcquireMode.RECORD_LIMIT,
+            2,
+            2,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records),
+            FETCH_ISOLATION_HWM),
+            2);
+
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(1, 1, 1));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(2, 2, 1));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+
+        // Ack only 1 record
+        ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(1, 1, List.of((byte) 1))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+        assertEquals(3, sharePartition.nextFetchOffset());
+        assertEquals(2, sharePartition.startOffset());
+        assertEquals(9, sharePartition.endOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        assertNotNull(sharePartition.cachedState().get(0L).offsetState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(3L).state());
+    }
+
+    @Test
+    public void testAcquisitionLockTimeoutInRecordLimitMode() throws 
InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = 
Mockito.spy(SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withPersister(persister)
+            .build());
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        // Create 3 batches of records.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 0, 5).close();
+        memoryRecordsBuilder(buffer, 5, 15).close();
+
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Acquire 3 records.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            ShareAcquireMode.RECORD_LIMIT,
+            BATCH_SIZE,
+            2,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records, 10),
+            FETCH_ISOLATION_HWM),
+            2);
+
+        assertArrayEquals(expectedAcquiredRecord(0, 1, 1).toArray(), 
acquiredRecordsList.toArray());
+        assertThrows(IllegalStateException.class, () -> 
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+        // There should be 2 timer tasks for 2 offsets.
+        assertEquals(2, sharePartition.timer().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+        
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask().hasExpired());
+        
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask().hasExpired());
+
+        // Allowing acquisition lock to expire.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> 
sharePartition.cachedState().get(0L).offsetState().get(0L).state() == 
RecordState.AVAILABLE &&
+                
sharePartition.cachedState().get(0L).offsetState().get(1L).state() == 
RecordState.AVAILABLE &&
+                
sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()
 == null &&
+                
sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask()
 == null &&
+                sharePartition.timer().size() == 0,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of(0L, 1L))));
+        // Acquisition lock timeout task has run already and next fetch offset 
is moved to 0.
+        assertEquals(0, sharePartition.nextFetchOffset());
+        assertEquals(1, 
sharePartition.cachedState().get(0L).offsetState().get(0L).deliveryCount());
+        assertEquals(1, 
sharePartition.cachedState().get(0L).offsetState().get(1L).deliveryCount());
+
+        acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            ShareAcquireMode.RECORD_LIMIT,
+            BATCH_SIZE,
+            2,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records, 10),
+            FETCH_ISOLATION_HWM),
+            2);
+        // delivery count increased to 2
+        List<AcquiredRecords> expectedAcquiredRecords = new 
ArrayList<>(expectedAcquiredRecord(0, 0, 2));
+        expectedAcquiredRecords.addAll(expectedAcquiredRecord(1, 1, 2));
+        assertArrayEquals(expectedAcquiredRecords.toArray(), 
acquiredRecordsList.toArray());
+
+        // Ack offset at 1 and let the other offset to expire again.
+        CompletableFuture<Void> ackResult = sharePartition.acknowledge(
+            MEMBER_ID,
+            List.of(new ShareAcknowledgementBatch(1, 1, List.of((byte) 2))));
+        assertNull(ackResult.join());
+        assertFalse(ackResult.isCompletedExceptionally());
+
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> 
sharePartition.cachedState().get(0L).offsetState().get(0L).state() == 
RecordState.AVAILABLE &&
+                
sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()
 == null &&
+                sharePartition.timer().size() == 0,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of(0L, 1L))));
+
+
+        assertEquals(0, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void 
testAcquireCachedStateInitialGapOverlapsWithActualPartitionGapInRecordLimitMode()
 {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Creating 2 batches starting from 16, such that there is a natural 
gap from 11 to 15
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 16, 20).close();
+        memoryRecordsBuilder(buffer, 36, 25).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        // Acquire 20 records.
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            ShareAcquireMode.RECORD_LIMIT,
+            BATCH_SIZE,
+            20,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records, 0),
+            FETCH_ISOLATION_HWM),
+            20);
+
+        // Acquired batches will contain the following ->
+        // 1. 16-20 (gap offsets)
+        // 2. 31-40 (gap offsets)
+        // 3. 51-55 (new offsets)
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(16, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 40, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(51, 55, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(16, sharePartition.startOffset());
+        assertEquals(60, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(56, sharePartition.nextFetchOffset());
+
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNull(persisterReadResultGapWindow);
+    }
+
+    @Test
+    public void 
testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGapInRecordLimitMode()
 {
+        Persister persister = Mockito.mock(Persister.class);
+        ReadShareGroupStateResult readShareGroupStateResult = 
Mockito.mock(ReadShareGroupStateResult.class);
+        
Mockito.when(readShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionAllData(0, 3, 11L, 
Errors.NONE.code(), Errors.NONE.message(),
+                    List.of(
+                        new PersisterStateBatch(21L, 30L, 
RecordState.ACKNOWLEDGED.id, (short) 2), // There is a gap from 11 to 20
+                        new PersisterStateBatch(41L, 50L, 
RecordState.ARCHIVED.id, (short) 1) // There is a gap from 31-40
+                    ))))));
+        
Mockito.when(persister.readState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(readShareGroupStateResult));
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
+
+        SharePartition sharePartition = 
SharePartitionBuilder.builder().withPersister(persister).build();
+
+        sharePartition.maybeInitialize();
+
+        // Creating 3 batches starting from 11, such that there is a natural 
gap from 26 to 30
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 11, 10).close();
+        memoryRecordsBuilder(buffer, 21, 15).close();
+        memoryRecordsBuilder(buffer, 41, 20).close();
+        buffer.flip();
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+
+        List<AcquiredRecords> acquiredRecordsList = 
fetchAcquiredRecords(sharePartition.acquire(
+            MEMBER_ID,
+            ShareAcquireMode.RECORD_LIMIT,
+            BATCH_SIZE,
+            15,
+            DEFAULT_FETCH_OFFSET,
+            fetchPartitionData(records, 0),
+            FETCH_ISOLATION_HWM),
+            15);
+
+        // Acquired batches will contain the following ->
+        // 1. 11-20 (gap offsets)
+        // 2. 31-35 (gap offsets)
+        List<AcquiredRecords> expectedAcquiredRecord = new 
ArrayList<>(expectedAcquiredRecord(11, 20, 1));
+        expectedAcquiredRecord.addAll(expectedAcquiredRecord(31, 35, 1));
+        assertArrayEquals(expectedAcquiredRecord.toArray(), 
acquiredRecordsList.toArray());
+
+        assertEquals(SharePartitionState.ACTIVE, 
sharePartition.partitionState());
+        assertFalse(sharePartition.cachedState().isEmpty());
+        assertEquals(11, sharePartition.startOffset());
+        assertEquals(50, sharePartition.endOffset());
+        assertEquals(3, sharePartition.stateEpoch());
+        assertEquals(36, sharePartition.nextFetchOffset());
+
+        GapWindow persisterReadResultGapWindow = 
sharePartition.persisterReadResultGapWindow();
+        assertNotNull(persisterReadResultGapWindow);
+        // Gap still exists from 36 to 50

Review Comment:
   comment should be `Gap still exists from 36 to 40`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to