apoorvmittal10 commented on code in PR #20080: URL: https://github.com/apache/kafka/pull/20080#discussion_r2179798245
########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2088,6 +2088,9 @@ void rollbackOrProcessStateUpdates( state.completeStateTransition(true); // Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully. state.cancelAndClearAcquisitionLockTimeoutTask(); + if (state.state != RecordState.ARCHIVED) { Review Comment: Should it be: ```suggestion if (state.state == AVAILABLE) { ``` ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7296,6 +7296,177 @@ public void testFetchLockReleasedByDifferentId() { assertNull(sharePartition.fetchLock()); // Fetch lock has been released. } + @Test + public void testAcquireWhenBatchHasOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + // Acquire a single batch with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + // Return a future which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future); + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id)))); + + // Assert the start offset has not moved and batch has ongoing transition. + assertEquals(21L, sharePartition.startOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId()); + + // Acquire the same batch with member-2. This function call will return with 0 records since there is an ongoing + // transition for this batch. + fetchAcquiredRecords( + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 0 + ); + + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId()); + + // Complete the future so acknowledge API can be completed, which updates the cache. Now the records can be acquired. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future.complete(writeShareGroupStateResult); + + // Acquire the same batch with member-2. 10 records will be acquired. + fetchAcquiredRecords( + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 10 + ); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + assertEquals("member-2", sharePartition.cachedState().get(21L).batchMemberId()); + } + + @Test + public void testNextFetchOffsetWhenBatchHasOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + // Acquire a single batch 0-9 with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, + fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Acquire a single batch 10-19 with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, + fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Validate that there is no ongoing transition. + assertEquals(2, sharePartition.cachedState().size()); + assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>(); + CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 when acknowledgement occurs for offsets 0-9. + // Mocking the persister write state RPC to return future 2 when acknowledgement occurs for offsets 10-19. + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(0, 9, List.of(AcknowledgeType.RELEASE.id)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(10, 19, List.of(AcknowledgeType.RELEASE.id)))); + + // Complete future2 so second acknowledge API can be completed, which updates the cache. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future2.complete(writeShareGroupStateResult); + + // Offsets 0-9 will have ongoing state transition since future1 is not complete yet. + // Offsets 10-19 won't have ongoing state transition since future2 has been completed. + assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition()); + + // nextFetchOffset should return 10 and not 0 because batch 0-9 is undergoing state transition. + assertEquals(10, sharePartition.nextFetchOffset()); + } + + @Test + public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + // Acquire a single batch 0-50 with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, + fetchPartitionData(memoryRecords(50, 0)), FETCH_ISOLATION_HWM + ), 50 + ); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>(); + CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 when acknowledgement occurs for offsets 5-9. + // Mocking the persister write state RPC to return future 2 when acknowledgement occurs for offsets 20-24. Review Comment: Similar to above. ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7296,6 +7296,177 @@ public void testFetchLockReleasedByDifferentId() { assertNull(sharePartition.fetchLock()); // Fetch lock has been released. } + @Test + public void testAcquireWhenBatchHasOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + // Acquire a single batch with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + // Return a future which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future = new CompletableFuture<>(); + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future); + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(21, 30, List.of(AcknowledgeType.RELEASE.id)))); + + // Assert the start offset has not moved and batch has ongoing transition. + assertEquals(21L, sharePartition.startOffset()); + assertEquals(1, sharePartition.cachedState().size()); + assertTrue(sharePartition.cachedState().get(21L).batchHasOngoingStateTransition()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId()); + + // Acquire the same batch with member-2. This function call will return with 0 records since there is an ongoing + // transition for this batch. + fetchAcquiredRecords( + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 0 + ); + + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(21L).batchState()); + assertEquals(EMPTY_MEMBER_ID, sharePartition.cachedState().get(21L).batchMemberId()); + + // Complete the future so acknowledge API can be completed, which updates the cache. Now the records can be acquired. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); + future.complete(writeShareGroupStateResult); + + // Acquire the same batch with member-2. 10 records will be acquired. + fetchAcquiredRecords( + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21, + fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + ), 10 + ); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); + assertEquals("member-2", sharePartition.cachedState().get(21L).batchMemberId()); + } + + @Test + public void testNextFetchOffsetWhenBatchHasOngoingTransition() { + Persister persister = Mockito.mock(Persister.class); + + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + // Acquire a single batch 0-9 with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, + fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Acquire a single batch 10-19 with member-1. + fetchAcquiredRecords( + sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 10, + fetchPartitionData(memoryRecords(10, 10)), FETCH_ISOLATION_HWM + ), 10 + ); + + // Validate that there is no ongoing transition. + assertEquals(2, sharePartition.cachedState().size()); + assertFalse(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(10L).batchHasOngoingStateTransition()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).batchState()); + assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(10L).batchState()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>(); + CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 when acknowledgement occurs for offsets 0-9. + // Mocking the persister write state RPC to return future 2 when acknowledgement occurs for offsets 10-19. Review Comment: ```suggestion // Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for offsets 0-9 and 10-19 respectively. ``` ########## core/src/main/java/kafka/server/share/SharePartition.java: ########## @@ -2088,6 +2088,9 @@ void rollbackOrProcessStateUpdates( state.completeStateTransition(true); // Cancel the acquisition lock timeout task for the state since it is acknowledged/released successfully. state.cancelAndClearAcquisitionLockTimeoutTask(); + if (state.state != RecordState.ARCHIVED) { + findNextFetchOffset.set(true); + } Review Comment: This change here makes sense, but do we need to remove `findNextFetchOffset.set(true);` at other places where we just started the transaction i.e. in acknowledgement? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org