This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 15206d5083b KAFKA-18084 Added usage for rollback state while 
SharePartition acquires records (#17965)
15206d5083b is described below

commit 15206d5083b18d7ff8afd360304dc17e90f7a35b
Author: Abhinav Dixit <[email protected]>
AuthorDate: Fri Dec 6 23:02:08 2024 +0530

    KAFKA-18084 Added usage for rollback state while SharePartition acquires 
records (#17965)
    
    Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal 
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    | 60 ++++++++++--------
 .../kafka/server/share/SharePartitionTest.java     | 72 ++++++++++++++++++++++
 .../server/ShareFetchAcknowledgeRequestTest.scala  | 59 ++++++++++++------
 3 files changed, 146 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index 4c5e72d0432..00ec974a722 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -690,7 +690,7 @@ public class SharePartition {
                         // acquire subset of offsets from the in-flight batch 
but only if the
                         // complete batch is available yet. Hence, do a 
pre-check to avoid exploding
                         // the in-flight offset tracking unnecessarily.
-                        if (inFlightBatch.batchState() != 
RecordState.AVAILABLE) {
+                        if (inFlightBatch.batchState() != 
RecordState.AVAILABLE || inFlightBatch.batchHasOngoingStateTransition()) {
                             log.trace("The batch is not available to acquire 
in share partition: {}-{}, skipping: {}"
                                     + " skipping offset tracking for batch as 
well.", groupId,
                                 topicIdPartition, inFlightBatch);
@@ -709,7 +709,7 @@ public class SharePartition {
                 }
 
                 // The in-flight batch is a full match hence change the state 
of the complete batch.
-                if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
+                if (inFlightBatch.batchState() != RecordState.AVAILABLE || 
inFlightBatch.batchHasOngoingStateTransition()) {
                     log.trace("The batch is not available to acquire in share 
partition: {}-{}, skipping: {}",
                         groupId, topicIdPartition, inFlightBatch);
                     continue;
@@ -1280,10 +1280,9 @@ public class SharePartition {
                     break;
                 }
 
-                if (offsetState.getValue().state != RecordState.AVAILABLE) {
-                    log.trace("The offset is not available skipping, offset: 
{} batch: {}"
-                            + " for the share partition: {}-{}", 
offsetState.getKey(), inFlightBatch,
-                        groupId, topicIdPartition);
+                if (offsetState.getValue().state != RecordState.AVAILABLE || 
offsetState.getValue().hasOngoingStateTransition()) {
+                    log.trace("The offset {} is not available in share 
partition: {}-{}, skipping: {}",
+                        offsetState.getKey(), groupId, topicIdPartition, 
inFlightBatch);
                     continue;
                 }
 
@@ -2066,8 +2065,8 @@ public class SharePartition {
             stateBatches.add(new 
PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
                     updateResult.state.id, (short) 
updateResult.deliveryCount));
 
-            // Update acquisition lock timeout task for the batch to null 
since it is completed now.
-            updateResult.updateAcquisitionLockTimeoutTask(null);
+            // Cancel the acquisition lock timeout task for the batch since it 
is completed now.
+            updateResult.cancelAndClearAcquisitionLockTimeoutTask();
             if (updateResult.state != RecordState.ARCHIVED) {
                 findNextFetchOffset.set(true);
             }
@@ -2113,8 +2112,8 @@ public class SharePartition {
             stateBatches.add(new PersisterStateBatch(offsetState.getKey(), 
offsetState.getKey(),
                     updateResult.state.id, (short) 
updateResult.deliveryCount));
 
-            // Update acquisition lock timeout task for the offset to null 
since it is completed now.
-            updateResult.updateAcquisitionLockTimeoutTask(null);
+            // Cancel the acquisition lock timeout task for the offset since 
it is completed now.
+            updateResult.cancelAndClearAcquisitionLockTimeoutTask();
             if (updateResult.state != RecordState.ARCHIVED) {
                 findNextFetchOffset.set(true);
             }
@@ -2251,10 +2250,7 @@ public class SharePartition {
 
         // Visible for testing.
         RecordState batchState() {
-            if (batchState == null) {
-                throw new IllegalStateException("The batch state is not 
available as the offset state is maintained");
-            }
-            return batchState.state;
+            return inFlightState().state;
         }
 
         // Visible for testing.
@@ -2275,10 +2271,7 @@ public class SharePartition {
 
         // Visible for testing.
         AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
-            if (batchState == null) {
-                throw new IllegalStateException("The batch state is not 
available as the offset state is maintained");
-            }
-            return batchState.acquisitionLockTimeoutTask;
+            return inFlightState().acquisitionLockTimeoutTask;
         }
 
         // Visible for testing.
@@ -2286,11 +2279,19 @@ public class SharePartition {
             return offsetState;
         }
 
-        private void archiveBatch(String newMemberId) {
+        private InFlightState inFlightState() {
             if (batchState == null) {
                 throw new IllegalStateException("The batch state is not 
available as the offset state is maintained");
             }
-            batchState.archive(newMemberId);
+            return batchState;
+        }
+
+        private boolean batchHasOngoingStateTransition() {
+            return inFlightState().hasOngoingStateTransition();
+        }
+
+        private void archiveBatch(String newMemberId) {
+            inFlightState().archive(newMemberId);
         }
 
         private InFlightState tryUpdateBatchState(RecordState newState, 
boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
@@ -2335,10 +2336,7 @@ public class SharePartition {
         }
 
         private void updateAcquisitionLockTimeout(AcquisitionLockTimerTask 
acquisitionLockTimeoutTask) {
-            if (batchState == null) {
-                throw new IllegalStateException("The batch state is not 
available as the offset state is maintained");
-            }
-            batchState.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
+            inFlightState().acquisitionLockTimeoutTask = 
acquisitionLockTimeoutTask;
         }
 
         @Override
@@ -2397,7 +2395,10 @@ public class SharePartition {
             return acquisitionLockTimeoutTask;
         }
 
-        void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask 
acquisitionLockTimeoutTask) {
+        void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask 
acquisitionLockTimeoutTask) throws IllegalArgumentException {
+            if (this.acquisitionLockTimeoutTask != null) {
+                throw new IllegalArgumentException("Existing acquisition lock 
timeout exists, cannot override.");
+            }
             this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
         }
 
@@ -2406,6 +2407,15 @@ public class SharePartition {
             acquisitionLockTimeoutTask = null;
         }
 
+        private boolean hasOngoingStateTransition() {
+            if (rollbackState == null) {
+                // This case could occur when the batch/offset hasn't 
transitioned even once or the state transitions have
+                // been committed.
+                return false;
+            }
+            return rollbackState.state != null;
+        }
+
         /**
          * Try to update the state of the records. The state of the records 
can only be updated if the
          * new state is allowed to be transitioned from old state. The 
delivery count is not incremented
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index fde84d82c4b..8d1bbc28232 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -5421,6 +5421,78 @@ public class SharePartitionTest {
         assertEquals(734, sharePartition.endOffset());
     }
 
+    @Test
+    public void testAcquireWithWriteShareGroupStateDelay() {
+        Persister persister = Mockito.mock(Persister.class);
+        mockPersisterReadStateMethod(persister);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withPersister(persister)
+            .withState(SharePartitionState.ACTIVE)
+            .build();
+
+        // Mock persister writeState method so that 
sharePartition.isWriteShareGroupStateSuccessful() returns true with a delay of 
5 sec.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), 
Collections.singletonList(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+
+        CompletableFuture<WriteShareGroupStateResult> future = new 
CompletableFuture<>();
+        // persister.writeState RPC will not complete instantaneously due to 
which commit won't happen for acknowledged offsets.
+        Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+
+        sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new 
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 0),
+            Optional.empty(), OptionalLong.empty(), Optional.empty(),
+            OptionalInt.empty(), false));
+
+        sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new 
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5),
+            Optional.empty(), OptionalLong.empty(), Optional.empty(),
+            OptionalInt.empty(), false));
+
+        List<ShareAcknowledgementBatch> acknowledgementBatches = new 
ArrayList<>();
+        acknowledgementBatches.add(new ShareAcknowledgementBatch(2, 3, 
Collections.singletonList((byte) 2)));
+        acknowledgementBatches.add(new ShareAcknowledgementBatch(5, 9, 
Collections.singletonList((byte) 2)));
+        // Acknowledge 2-3, 5-9 offsets with RELEASE acknowledge type.
+        sharePartition.acknowledge(MEMBER_ID, acknowledgementBatches);
+
+        assertEquals(2, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(3L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(4L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).batchState());
+
+        // Even though offsets 2-3, 5-9 are in available state, but they won't 
be acquired since they are still in transition from ACQUIRED
+        // to AVAILABLE state as the write state RPC has not completed yet, so 
the commit hasn't happened yet.
+        sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new 
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0),
+            Optional.empty(), OptionalLong.empty(), Optional.empty(),
+            OptionalInt.empty(), false));
+
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(3L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(4L).state());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(5L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+
+        // persister.writeState RPC will complete now. This is going to commit 
all the acknowledged batches. Hence, their
+        // rollBack state will become null and they will be available for 
acquire again.
+        future.complete(writeShareGroupStateResult);
+        sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new 
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0),
+            Optional.empty(), OptionalLong.empty(), Optional.empty(),
+            OptionalInt.empty(), false));
+        assertEquals(3, sharePartition.cachedState().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(3L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(4L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(5L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(10L).batchState());
+    }
+
     private List<AcquiredRecords> fetchAcquiredRecords(ShareAcquiredRecords 
shareAcquiredRecords, int expectedOffsetCount) {
         assertNotNull(shareAcquiredRecords);
         assertEquals(expectedOffsetCount, shareAcquiredRecords.count());
diff --git 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index d2679cd62ea..73f9fce42e6 100644
--- 
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -835,38 +835,57 @@ class ShareFetchAcknowledgeRequestTest(cluster: 
ClusterInstance) extends GroupCo
       .setAcknowledgeErrorCode(Errors.NONE.code())
       
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0), 
Collections.singletonList(9), Collections.singletonList(1)))
 
-    var fetchPartitionData = 
shareFetchResponseData.responses().get(0).partitions().get(0)
+    val fetchPartitionData = 
shareFetchResponseData.responses().get(0).partitions().get(0)
     compareFetchResponsePartitions(expectedFetchPartitionData, 
fetchPartitionData)
 
     // Producing 10 more records to the topic created above
     produceData(topicIdPartition, 10)
 
-    // Send a third Share Fetch request with piggybacked acknowledgements
-    shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
-    metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
-    acknowledgementsMapForFetch = Map(topicIdPartition -> List(new 
ShareFetchRequestData.AcknowledgementBatch()
-      .setFirstOffset(0)
-      .setLastOffset(9)
-      .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) // 
Release the records
-    shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
-    shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
-
-    shareFetchResponseData = shareFetchResponse.data()
-    assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
-    assertEquals(1, shareFetchResponseData.responses().size())
-    assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
-    assertEquals(1, 
shareFetchResponseData.responses().get(0).partitions().size())
-
     expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
       .setPartitionIndex(partition)
       .setErrorCode(Errors.NONE.code())
       .setAcknowledgeErrorCode(Errors.NONE.code())
       .setAcquiredRecords(expectedAcquiredRecords(List(0L, 10L).asJava, 
List(9L, 19L).asJava, List(2, 1).asJava))
+
+    val acquiredRecords : util.List[AcquiredRecords] = new 
util.ArrayList[AcquiredRecords]()
+    var releaseAcknowledgementSent = false
+
+    TestUtils.waitUntilTrue(() => {
+      shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
+      metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
+      if (releaseAcknowledgementSent) {
+        // For fourth share fetch request onwards
+        acknowledgementsMapForFetch = Map.empty
+      } else {
+        // Send a third Share Fetch request with piggybacked acknowledgements
+        acknowledgementsMapForFetch = Map(topicIdPartition -> List(new 
ShareFetchRequestData.AcknowledgementBatch()
+          .setFirstOffset(0)
+          .setLastOffset(9)
+          .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) 
// Release the records
+        releaseAcknowledgementSent = true
+      }
+      shareFetchRequest = createShareFetchRequest(groupId, metadata, 
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
+      shareFetchResponse = 
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
+      shareFetchResponseData = shareFetchResponse.data()
+      assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+      assertEquals(1, shareFetchResponseData.responses().size())
+      assertEquals(topicId, 
shareFetchResponseData.responses().get(0).topicId())
+      val responseSize = 
shareFetchResponseData.responses().get(0).partitions().size()
+      if (responseSize > 0) {
+        
acquiredRecords.addAll(shareFetchResponseData.responses().get(0).partitions().get(0).acquiredRecords())
+      }
+      // There should be 2 acquired record batches finally -
+      // 1. batch containing 0-9 offsets which were initially acknowledged as 
RELEASED.
+      // 2. batch containing 10-19 offsets which were produced in the second 
produceData function call.
+      acquiredRecords.size() == 2
+
+    }, "Share fetch request failed", 5000)
+
     // All the records from offsets 0 to 19 will be fetched. Records from 0 to 
9 will have delivery count as 2 because
     // they are re delivered, and records from 10 to 19 will have delivery 
count as 1 because they are newly acquired
-
-    fetchPartitionData = 
shareFetchResponseData.responses().get(0).partitions().get(0)
-    compareFetchResponsePartitions(expectedFetchPartitionData, 
fetchPartitionData)
+    
assertTrue(expectedFetchPartitionData.acquiredRecords().containsAll(acquiredRecords)
 &&
+      
acquiredRecords.containsAll(expectedFetchPartitionData.acquiredRecords()))
   }
 
   @ClusterTests(

Reply via email to