adixitconfluent commented on code in PR #20124:
URL: https://github.com/apache/kafka/pull/20124#discussion_r2204740140


##########
core/src/test/java/kafka/server/share/SharePartitionTest.java:
##########
@@ -7467,6 +7468,164 @@ public void 
testNextFetchOffsetWhenOffsetsHaveOngoingTransition() {
         assertEquals(20, sharePartition.nextFetchOffset());
     }
 
+    @Test
+    public void testLsoMovementWithWriteStateRPCFailuresInAck() {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+        // Validate that there is no ongoing transition.
+        
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+        
assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+        // Return futures which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<WriteShareGroupStateResult> future2 = new 
CompletableFuture<>();
+
+        // Mocking the persister write state RPC to return future 1 and future 
2 when acknowledgement occurs for
+        // offsets 2-6 and 7-11 respectively.
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+        // Acknowledge batch to create ongoing transition.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id))));
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id))));
+
+        // Validate that there is no ongoing transition.
+        
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+        // LSO is at 9.
+        sharePartition.updateCacheAndOffsets(9);
+
+        // Start offset will be moved.
+        assertEquals(9, sharePartition.nextFetchOffset());
+        assertEquals(9, sharePartition.startOffset());
+        assertEquals(11, sharePartition.endOffset());
+        assertEquals(2, sharePartition.cachedState().size());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(2L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(7L).offsetState().get(7L).state());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(7L).offsetState().get(8L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(7L).offsetState().get(9L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(7L).offsetState().get(10L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(7L).offsetState().get(11L).state());
+
+        // Complete future1 exceptionally so acknowledgement for 2-6 offsets 
will be completed.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+        future1.complete(writeShareGroupStateResult);
+
+        // The completion of future1 with exception should not impact the 
cached state since those records have already
+        // been archived.
+        assertEquals(9, sharePartition.nextFetchOffset());
+        assertEquals(9, sharePartition.startOffset());
+        assertEquals(11, sharePartition.endOffset());
+        assertEquals(2, sharePartition.cachedState().size());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(2L).batchState());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(7L).offsetState().get(7L).state());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(7L).offsetState().get(8L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(7L).offsetState().get(9L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(7L).offsetState().get(10L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(7L).offsetState().get(11L).state());

Review Comment:
   Hi @apoorvmittal10 , while writing the test as you mentioned, I encountered 
another problem in the code where during write state call processing, a batch 
can be split into offsets. This can happen due to LSO movement etc. Hence, we 
should deal with that issue in a separate JIRA 
https://issues.apache.org/jira/browse/KAFKA-19502 and we should change the test 
case then



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to