apoorvmittal10 commented on code in PR #20080:
URL: https://github.com/apache/kafka/pull/20080#discussion_r2179798245


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2088,6 +2088,9 @@ void rollbackOrProcessStateUpdates(
                     state.completeStateTransition(true);
                     // Cancel the acquisition lock timeout task for the state 
since it is acknowledged/released successfully.
                     state.cancelAndClearAcquisitionLockTimeoutTask();
+                    if (state.state != RecordState.ARCHIVED) {

Review Comment:
   Should it be:
   ```suggestion
                       if (state.state == AVAILABLE) {
   ```



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7296,6 +7296,177 @@ public void testFetchLockReleasedByDifferentId() {
         assertNull(sharePartition.fetchLock()); // Fetch lock has been 
released.
     }
 
+    @Test
+    public void testAcquireWhenBatchHasOngoingTransition() {
+        Persister persister = Mockito.mock(Persister.class);
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+        // Acquire a single batch with member-1.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 
21,
+                fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+
+        // Validate that there is no ongoing transition.
+        
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+        // Return a future which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future = new 
CompletableFuture<>();
+        Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+        // Acknowledge batch to create ongoing transition.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id))));
+
+        // Assert the start offset has not moved and batch has ongoing 
transition.
+        assertEquals(21L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(21L).batchMemberId());
+
+        // Acquire the same batch with member-2. This function call will 
return with 0 records since there is an ongoing
+        // transition for this batch.
+        fetchAcquiredRecords(
+            sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
21,
+                fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+            ), 0
+        );
+
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(21L).batchMemberId());
+
+        // Complete the future so acknowledge API can be completed, which 
updates the cache. Now the records can be acquired.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        future.complete(writeShareGroupStateResult);
+
+        // Acquire the same batch with member-2. 10 records will be acquired.
+        fetchAcquiredRecords(
+            sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
21,
+                fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals("member-2", 
sharePartition.cachedState().get(21L).batchMemberId());
+    }
+
+    @Test
+    public void testNextFetchOffsetWhenBatchHasOngoingTransition() {
+        Persister persister = Mockito.mock(Persister.class);
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+
+        // Acquire a single batch 0-9 with member-1.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
+                fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+
+        // Acquire a single batch 10-19 with member-1.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 
10,
+                fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+
+        // Validate that there is no ongoing transition.
+        assertEquals(2, sharePartition.cachedState().size());
+        
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+        
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+
+        // Return futures which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<WriteShareGroupStateResult> future2 = new 
CompletableFuture<>();
+
+        // Mocking the persister write state RPC to return future 1 when 
acknowledgement occurs for offsets 0-9.
+        // Mocking the persister write state RPC to return future 2 when 
acknowledgement occurs for offsets 10-19.
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+        // Acknowledge batch to create ongoing transition.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(0, 9, List.of(AcknowledgeType.RELEASE.id))));
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(10, 19, List.of(AcknowledgeType.RELEASE.id))));
+
+        // Complete future2 so second acknowledge API can be completed, which 
updates the cache.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        future2.complete(writeShareGroupStateResult);
+
+        // Offsets 0-9 will have ongoing state transition since future1 is not 
complete yet.
+        // Offsets 10-19 won't have ongoing state transition since future2 has 
been completed.
+        
assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+        
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
+
+        // nextFetchOffset should return 10 and not 0 because batch 0-9 is 
undergoing state transition.
+        assertEquals(10, sharePartition.nextFetchOffset());
+    }
+
+    @Test
+    public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() {
+        Persister persister = Mockito.mock(Persister.class);
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+
+        // Acquire a single batch 0-50 with member-1.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
+                fetchPartitionData(memoryRecords(50, 0)), FETCH_ISOLATION_HWM
+            ), 50
+        );
+
+        // Validate that there is no ongoing transition.
+        
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+
+        // Return futures which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<WriteShareGroupStateResult> future2 = new 
CompletableFuture<>();
+
+        // Mocking the persister write state RPC to return future 1 when 
acknowledgement occurs for offsets 5-9.
+        // Mocking the persister write state RPC to return future 2 when 
acknowledgement occurs for offsets 20-24.

Review Comment:
   Similar to above.



##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7296,6 +7296,177 @@ public void testFetchLockReleasedByDifferentId() {
         assertNull(sharePartition.fetchLock()); // Fetch lock has been 
released.
     }
 
+    @Test
+    public void testAcquireWhenBatchHasOngoingTransition() {
+        Persister persister = Mockito.mock(Persister.class);
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+        // Acquire a single batch with member-1.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 
21,
+                fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+
+        // Validate that there is no ongoing transition.
+        
assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+        // Return a future which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future = new 
CompletableFuture<>();
+        Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+        // Acknowledge batch to create ongoing transition.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id))));
+
+        // Assert the start offset has not moved and batch has ongoing 
transition.
+        assertEquals(21L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        
assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(21L).batchMemberId());
+
+        // Acquire the same batch with member-2. This function call will 
return with 0 records since there is an ongoing
+        // transition for this batch.
+        fetchAcquiredRecords(
+            sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
21,
+                fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+            ), 0
+        );
+
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(21L).batchMemberId());
+
+        // Complete the future so acknowledge API can be completed, which 
updates the cache. Now the records can be acquired.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        future.complete(writeShareGroupStateResult);
+
+        // Acquire the same batch with member-2. 10 records will be acquired.
+        fetchAcquiredRecords(
+            sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 
21,
+                fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(21L).batchState());
+        assertEquals("member-2", 
sharePartition.cachedState().get(21L).batchMemberId());
+    }
+
+    @Test
+    public void testNextFetchOffsetWhenBatchHasOngoingTransition() {
+        Persister persister = Mockito.mock(Persister.class);
+
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+
+        // Acquire a single batch 0-9 with member-1.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
+                fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+
+        // Acquire a single batch 10-19 with member-1.
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 
10,
+                fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM
+            ), 10
+        );
+
+        // Validate that there is no ongoing transition.
+        assertEquals(2, sharePartition.cachedState().size());
+        
assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+        
assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+
+        // Return futures which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<WriteShareGroupStateResult> future2 = new 
CompletableFuture<>();
+
+        // Mocking the persister write state RPC to return future 1 when 
acknowledgement occurs for offsets 0-9.
+        // Mocking the persister write state RPC to return future 2 when 
acknowledgement occurs for offsets 10-19.

Review Comment:
   ```suggestion
           // Mocking the persister write state RPC to return future 1 and 
future 2 when acknowledgement occurs for offsets 0-9 and 10-19 respectively.
   ```



##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2088,6 +2088,9 @@ void rollbackOrProcessStateUpdates(
                     state.completeStateTransition(true);
                     // Cancel the acquisition lock timeout task for the state 
since it is acknowledged/released successfully.
                     state.cancelAndClearAcquisitionLockTimeoutTask();
+                    if (state.state != RecordState.ARCHIVED) {
+                        findNextFetchOffset.set(true);
+                    }

Review Comment:
   This change here makes sense, but do we need to remove 
`findNextFetchOffset.set(true);` at other places where we just started the 
transaction i.e. in acknowledgement?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to