apoorvmittal10 commented on code in PR #20124: URL: https://github.com/apache/kafka/pull/20124#discussion_r2201148519
########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7467,6 +7468,164 @@ public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() { assertEquals(20, sharePartition.nextFetchOffset()); } + @Test + public void testLsoMovementWithWriteStateRPCFailuresInAck() { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>(); + CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for + // offsets 2-6 and 7-11 respectively. + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id)))); + + // Validate that there is no ongoing transition. + assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + + // LSO is at 9. + sharePartition.updateCacheAndOffsets(9); + + // Start offset will be moved. + assertEquals(9, sharePartition.nextFetchOffset()); + assertEquals(9, sharePartition.startOffset()); + assertEquals(11, sharePartition.endOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(7L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(8L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(9L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(10L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(11L).state()); + + // Complete future1 exceptionally so acknowledgement for 2-6 offsets will be completed. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); + future1.complete(writeShareGroupStateResult); + + // The completion of future1 with exception should not impact the cached state since those records have already + // been archived. + assertEquals(9, sharePartition.nextFetchOffset()); + assertEquals(9, sharePartition.startOffset()); + assertEquals(11, sharePartition.endOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(7L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(8L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(9L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(10L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(11L).state()); + } + + @Test + public void inFlightStateRollbackAndArchiveStateTransition() throws InterruptedException { + InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 1, MEMBER_ID); + + inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, SharePartition.DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID); + assertTrue(inFlightState.hasOngoingStateTransition()); + + // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED which is not committed yet. At the same + // time when we have a call to completeStateTransition with false commit value, we get a call to ARCHIVE the record. + // No matter the order of the 2 calls, we should always be getting the final state as ARCHIVED. + ExecutorService executorService = Executors.newFixedThreadPool(2); + try { + List<Callable<Void>> callables = List.of( + () -> { + inFlightState.archive("member-2"); + return null; + }, + () -> { + inFlightState.completeStateTransition(false); + return null; + } Review Comment: We should add a test case when commit succeeds. It will be good to have so future refactoring do not introduce new issues. ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7467,6 +7468,164 @@ public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() { assertEquals(20, sharePartition.nextFetchOffset()); } + @Test + public void testLsoMovementWithWriteStateRPCFailuresInAck() { Review Comment: ```suggestion public void testLsoMovementWithWriteStateRPCFailuresInAcknowledgement() { ``` ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7467,6 +7468,164 @@ public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() { assertEquals(20, sharePartition.nextFetchOffset()); } + @Test + public void testLsoMovementWithWriteStateRPCFailuresInAck() { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>(); + CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for + // offsets 2-6 and 7-11 respectively. + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id)))); + + // Validate that there is no ongoing transition. + assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + + // LSO is at 9. Review Comment: ```suggestion // Move LSO to 9, so some records/offsets can be marked archived. ``` ########## core/src/test/java/kafka/server/share/SharePartitionTest.java: ########## @@ -7467,6 +7468,164 @@ public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() { assertEquals(20, sharePartition.nextFetchOffset()); } + @Test + public void testLsoMovementWithWriteStateRPCFailuresInAck() { + Persister persister = Mockito.mock(Persister.class); + SharePartition sharePartition = SharePartitionBuilder.builder() + .withState(SharePartitionState.ACTIVE) + .withPersister(persister) + .build(); + + fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + + // Validate that there is no ongoing transition. + assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + + // Return futures which will be completed later, so the batch state has ongoing transition. + CompletableFuture<WriteShareGroupStateResult> future1 = new CompletableFuture<>(); + CompletableFuture<WriteShareGroupStateResult> future2 = new CompletableFuture<>(); + + // Mocking the persister write state RPC to return future 1 and future 2 when acknowledgement occurs for + // offsets 2-6 and 7-11 respectively. + Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2); + + // Acknowledge batch to create ongoing transition. + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id)))); + sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id)))); + + // Validate that there is no ongoing transition. + assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); + assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition()); + + // LSO is at 9. + sharePartition.updateCacheAndOffsets(9); + + // Start offset will be moved. + assertEquals(9, sharePartition.nextFetchOffset()); + assertEquals(9, sharePartition.startOffset()); + assertEquals(11, sharePartition.endOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(7L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(8L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(9L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(10L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(11L).state()); + + // Complete future1 exceptionally so acknowledgement for 2-6 offsets will be completed. + WriteShareGroupStateResult writeShareGroupStateResult = Mockito.mock(WriteShareGroupStateResult.class); + Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of( + new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of( + PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); + future1.complete(writeShareGroupStateResult); + + // The completion of future1 with exception should not impact the cached state since those records have already + // been archived. + assertEquals(9, sharePartition.nextFetchOffset()); + assertEquals(9, sharePartition.startOffset()); + assertEquals(11, sharePartition.endOffset()); + assertEquals(2, sharePartition.cachedState().size()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(2L).batchState()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(7L).state()); + assertEquals(RecordState.ARCHIVED, sharePartition.cachedState().get(7L).offsetState().get(8L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(9L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(10L).state()); + assertEquals(RecordState.AVAILABLE, sharePartition.cachedState().get(7L).offsetState().get(11L).state()); Review Comment: These are not being test as future2 is never compeleted, is it intended? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org