This is an automated email from the ASF dual-hosted git repository.

mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 05d71ad1a8c KAFKA-19476: Concurrent execution fixes for lock timeout 
and lso movement (#20286)
05d71ad1a8c is described below

commit 05d71ad1a8cda4bdaa776be0506cb279ec72b1be
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Aug 1 23:20:25 2025 +0100

    KAFKA-19476: Concurrent execution fixes for lock timeout and lso movement 
(#20286)
    
    The PR fixes following:
    
    1. In case share partition arrive at a state which should be treated as
    final state
    of that batch/offset (example - LSO movement which causes offset/batch
    to be ARCHIVED permanently), the result of pending write state RPCs for
    that offset/batch override the ARCHIVED state. Hence track such updates
    and apply when transition is completed.
    
    2. If an acquisition lock timeout occurs while an offset/batch is
    undergoing transition followed by write state RPC failure, then
    respective batch/offset can
    land in a scenario where the offset stays in ACQUIRED state with no
    acquisition lock timeout task.
    
    3. If a timer task is cancelled, but due to concurrent execution of
    timer task and acknowledgement, there can be a scenario when timer task
    has processed post cancellation. Hence it can mark the offset/batch
    re-avaialble despite already acknowledged.
    
    Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
     <[email protected]>
---
 .../java/kafka/server/share/SharePartition.java    |  66 +++--
 .../kafka/server/share/SharePartitionTest.java     | 305 +++++++++++++++++++++
 .../share/fetch/AcquisitionLockTimeoutHandler.java |   4 +-
 .../share/fetch/AcquisitionLockTimerTask.java      |  13 +-
 .../kafka/server/share/fetch/InFlightBatch.java    |   8 +-
 .../kafka/server/share/fetch/InFlightState.java    |  71 +++--
 6 files changed, 425 insertions(+), 42 deletions(-)

diff --git a/core/src/main/java/kafka/server/share/SharePartition.java 
b/core/src/main/java/kafka/server/share/SharePartition.java
index dbd7e5e1730..db9b862839c 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -241,6 +241,12 @@ public class SharePartition {
      */
     private final AcquisitionLockTimeoutHandler timeoutHandler;
 
+    /**
+     * The replica manager is used to check to see if any delayed share fetch 
request can be completed because of data
+     * availability due to acquisition lock timeout.
+     */
+    private final ReplicaManager replicaManager;
+
     /**
      * The share partition start offset specifies the partition start offset 
from which the records
      * are cached in the cachedState of the sharePartition.
@@ -295,12 +301,6 @@ public class SharePartition {
      */
     private long fetchLockIdleDurationMs;
 
-    /**
-     * The replica manager is used to check to see if any delayed share fetch 
request can be completed because of data
-     * availability due to acquisition lock timeout.
-     */
-    private final ReplicaManager replicaManager;
-
     SharePartition(
         String groupId,
         TopicIdPartition topicIdPartition,
@@ -1245,10 +1245,7 @@ public class SharePartition {
                     continue;
                 }
 
-                offsetState.getValue().archive(EMPTY_MEMBER_ID);
-                if (initialState == RecordState.ACQUIRED) {
-                    
offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask();
-                }
+                offsetState.getValue().archive();
                 isAnyOffsetArchived = true;
             }
             return isAnyOffsetArchived;
@@ -1263,10 +1260,7 @@ public class SharePartition {
             log.trace("Archiving complete batch: {} for the share partition: 
{}-{}", inFlightBatch, groupId, topicIdPartition);
             if (inFlightBatch.batchState() == initialState) {
                 // Change the state of complete batch since the same state 
exists for the entire inFlight batch.
-                inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
-                if (initialState == RecordState.ACQUIRED) {
-                    inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
-                }
+                inFlightBatch.archiveBatch();
                 return true;
             }
         } finally {
@@ -1799,6 +1793,12 @@ public class SharePartition {
                     if (throwable.isPresent()) {
                         return throwable;
                     }
+
+                    if (inFlightBatch.batchHasOngoingStateTransition()) {
+                        log.debug("The batch has on-going transition, batch: 
{} for the share "
+                            + "partition: {}-{}", inFlightBatch, groupId, 
topicIdPartition);
+                        return Optional.of(new 
InvalidRecordStateException("The record state is invalid. The acknowledgement 
of delivery could not be completed."));
+                    }
                 }
 
                 // Determine if the in-flight batch is a full match from the 
request batch.
@@ -1899,7 +1899,15 @@ public class SharePartition {
                             + " partition: {}-{}", offsetState.getKey(), 
inFlightBatch, groupId,
                         topicIdPartition);
                     return Optional.of(new InvalidRecordStateException(
-                        "The batch cannot be acknowledged. The offset is not 
acquired."));
+                        "The offset cannot be acknowledged. The offset is not 
acquired."));
+                }
+
+                if (offsetState.getValue().hasOngoingStateTransition()) {
+                    log.debug("The offset has on-going transition, offset: {} 
batch: {} for the share"
+                            + " partition: {}-{}", offsetState.getKey(), 
inFlightBatch, groupId,
+                        topicIdPartition);
+                    return Optional.of(new InvalidRecordStateException(
+                        "The record state is invalid. The acknowledgement of 
delivery could not be completed."));
                 }
 
                 // Check if member id is the owner of the offset.
@@ -2044,7 +2052,12 @@ public class SharePartition {
                 // Log in DEBUG to avoid flooding of logs for a faulty client.
                 log.debug("Request failed for updating state, rollback any 
changed state"
                     + " for the share partition: {}-{}", groupId, 
topicIdPartition);
-                updatedStates.forEach(state -> 
state.completeStateTransition(false));
+                updatedStates.forEach(state -> {
+                    state.completeStateTransition(false);
+                    if (state.state() == RecordState.AVAILABLE) {
+                        updateFindNextFetchOffset(true);
+                    }
+                });
                 future.completeExceptionally(throwable);
                 return;
             }
@@ -2067,7 +2080,14 @@ public class SharePartition {
                 if (exception != null) {
                     log.debug("Failed to write state to persister for the 
share partition: {}-{}",
                         groupId, topicIdPartition, exception);
-                    updatedStates.forEach(state -> 
state.completeStateTransition(false));
+                    // In case of failure when transition state is rolled back 
then it should be rolled
+                    // back to ACQUIRED state, unless acquisition lock for the 
state has expired.
+                    updatedStates.forEach(state -> {
+                        state.completeStateTransition(false);
+                        if (state.state() == RecordState.AVAILABLE) {
+                            updateFindNextFetchOffset(true);
+                        }
+                    });
                     future.completeExceptionally(exception);
                     return;
                 }
@@ -2076,8 +2096,6 @@ public class SharePartition {
                     groupId, topicIdPartition);
                 updatedStates.forEach(state -> {
                     state.completeStateTransition(true);
-                    // Cancel the acquisition lock timeout task for the state 
since it is acknowledged/released successfully.
-                    state.cancelAndClearAcquisitionLockTimeoutTask();
                     if (state.state() == RecordState.AVAILABLE) {
                         updateFindNextFetchOffset(true);
                     }
@@ -2389,10 +2407,18 @@ public class SharePartition {
     }
 
     private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
-        return (memberId, firstOffset, lastOffset) -> {
+        return (memberId, firstOffset, lastOffset, timerTask) -> {
             List<PersisterStateBatch> stateBatches;
             lock.writeLock().lock();
             try {
+                // Check if timer task is already cancelled. This can happen 
when concurrent requests
+                // happen to acknowledge in-flight state and timeout handler 
is waiting for the lock
+                // but already cancelled.
+                if (timerTask.isCancelled()) {
+                    log.debug("Timer task is already cancelled, not executing 
further.");
+                    return;
+                }
+
                 Map.Entry<Long, InFlightBatch> floorOffset = 
cachedState.floorEntry(firstOffset);
                 if (floorOffset == null) {
                     log.error("Base offset {} not found for share partition: 
{}-{}", firstOffset, groupId, topicIdPartition);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java 
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c17ce391b37..1289d720054 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -55,6 +55,7 @@ import 
org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
 import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
 import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
+import org.apache.kafka.server.share.fetch.DeliveryCountOps;
 import org.apache.kafka.server.share.fetch.InFlightState;
 import org.apache.kafka.server.share.fetch.RecordState;
 import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@@ -71,6 +72,7 @@ import org.apache.kafka.server.storage.log.FetchPartitionData;
 import org.apache.kafka.server.util.FutureUtils;
 import org.apache.kafka.server.util.timer.MockTimer;
 import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
 import org.apache.kafka.storage.internals.log.OffsetResultHolder;
 import org.apache.kafka.test.TestUtils;
 
@@ -88,6 +90,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -7573,6 +7576,308 @@ public class SharePartitionTest {
         assertEquals(20, sharePartition.nextFetchOffset());
     }
 
+    @Test
+    public void testAcquisitionLockTimeoutWithConcurrentAcknowledgement() 
throws InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withPersister(persister)
+            .build();
+
+        // Create 2 batches of records.
+        ByteBuffer buffer = ByteBuffer.allocate(4096);
+        memoryRecordsBuilder(buffer, 5, 0).close();
+        memoryRecordsBuilder(buffer, 15, 5).close();
+
+        buffer.flip();
+
+        MemoryRecords records = MemoryRecords.readableRecords(buffer);
+        // Acquire 10 records.
+        fetchAcquiredRecords(sharePartition.acquire(
+              MEMBER_ID,
+              5, /* Batch size of 5 so cache can have 2 entries */
+              10,
+              DEFAULT_FETCH_OFFSET,
+              fetchPartitionData(records, 0),
+              FETCH_ISOLATION_HWM),
+            20);
+
+        assertEquals(2, sharePartition.cachedState().size());
+        assertEquals(2, sharePartition.timer().size());
+
+        // Return 2 future which will be completed later.
+        CompletableFuture<WriteShareGroupStateResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<WriteShareGroupStateResult> future2 = new 
CompletableFuture<>();
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+        // Store the corresponding batch timer tasks.
+        TimerTask timerTask1 = 
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask();
+        TimerTask timerTask2 = 
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask();
+
+        // Acknowledge 1 offset in first batch as Accept to create offset 
tracking, accept complete
+        // sencond batch. And mark offset 0 as release so cached state do not 
move ahead.
+        sharePartition.acknowledge(MEMBER_ID, List.of(
+            new ShareAcknowledgementBatch(0, 0, 
List.of(AcknowledgeType.RELEASE.id)),
+            new ShareAcknowledgementBatch(1, 1, 
List.of(AcknowledgeType.ACCEPT.id)),
+            new ShareAcknowledgementBatch(5, 19, 
List.of(AcknowledgeType.ACCEPT.id))));
+
+        // Assert the start offset has not moved.
+        assertEquals(0L, sharePartition.startOffset());
+        assertEquals(2, sharePartition.cachedState().size());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).batchState());
+        // Verify ongoing transition states.
+        
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(0L).hasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(1L).hasOngoingStateTransition());
+        
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(2L).hasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(5L).batchHasOngoingStateTransition());
+
+        // Validate first timer task is already cancelled.
+        assertTrue(timerTask1.isCancelled());
+        assertFalse(timerTask2.isCancelled());
+
+        // Fetch offset state timer tasks.
+        TimerTask timerTaskOffsetState1 = 
sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask();
+        TimerTask timerTaskOffsetState2 = 
sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask();
+        TimerTask timerTaskOffsetState3 = 
sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask();
+
+        // Complete futures.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), 
Errors.NONE.message())))));
+        future1.complete(writeShareGroupStateResult);
+        future2.complete(writeShareGroupStateResult);
+
+        // Verify timer tasks are now cancelled, except unacknowledged offsets.
+        assertEquals(2, sharePartition.cachedState().size());
+        assertTrue(timerTask2.isCancelled());
+        assertTrue(timerTaskOffsetState1.isCancelled());
+        assertTrue(timerTaskOffsetState2.isCancelled());
+        assertFalse(timerTaskOffsetState3.isCancelled());
+
+        // Verify the state prior executing the timer tasks.
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).batchState());
+
+        // Running expired timer tasks should not mark offsets available, 
except for offset 2.
+        timerTask1.run();
+        // State should remain same.
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+
+        timerTask2.run();
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(5L).batchState());
+
+        timerTaskOffsetState2.run();
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+
+        // Should update the state to available as the timer task is not yet 
expired.
+        timerTaskOffsetState3.run();
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+    }
+
+    @Test
+    public void testLsoMovementWithWriteStateRPCFailuresInAcknowledgement() {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withPersister(persister)
+            .build();
+
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+        fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+        // Validate that there is no ongoing transition.
+        
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+        
assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+        // Return futures which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future1 = new 
CompletableFuture<>();
+        CompletableFuture<WriteShareGroupStateResult> future2 = new 
CompletableFuture<>();
+
+        // Mocking the persister write state RPC to return future 1 and future 
2 when acknowledgement occurs for
+        // offsets 2-6 and 7-11 respectively.
+        
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+        // Acknowledge batch to create ongoing transition.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id))));
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id))));
+
+        // Validate that there is no ongoing transition.
+        
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+        // Move LSO to 7, so some records/offsets can be marked archived for 
the first batch.
+        sharePartition.updateCacheAndOffsets(7L);
+
+        // Start offset will be moved.
+        assertEquals(12L, sharePartition.nextFetchOffset());
+        assertEquals(7L, sharePartition.startOffset());
+        assertEquals(11, sharePartition.endOffset());
+        assertEquals(2, sharePartition.cachedState().size());
+        
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(2L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(7L).batchState());
+
+        // Complete future1 exceptionally so acknowledgement for 2-6 offsets 
will be completed.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+        future1.complete(writeShareGroupStateResult);
+
+        // The completion of future1 with exception should not impact the 
cached state since those records have already
+        // been archived.
+        assertEquals(12, sharePartition.nextFetchOffset());
+        assertEquals(7, sharePartition.startOffset());
+        assertEquals(11, sharePartition.endOffset());
+        assertEquals(2, sharePartition.cachedState().size());
+        
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+        
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(2L).batchState());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(7L).batchState());
+
+        future2.complete(writeShareGroupStateResult);
+        assertEquals(12L, sharePartition.nextFetchOffset());
+        assertEquals(7, sharePartition.startOffset());
+        assertEquals(11, sharePartition.endOffset());
+        assertEquals(2, sharePartition.cachedState().size());
+        assertEquals(RecordState.ARCHIVED, 
sharePartition.cachedState().get(2L).batchState());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(7L).batchState());
+    }
+
+    @Test
+    public void inFlightStateRollbackAndArchiveStateTransition() throws 
InterruptedException {
+        InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 
1, MEMBER_ID);
+
+        inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, 
DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
+        assertTrue(inFlightState.hasOngoingStateTransition());
+
+        // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED 
which is not committed yet. At the same
+        // time when we have a call to completeStateTransition with false 
commit value, we get a call to ARCHIVE the record.
+        // No matter the order of the 2 calls, we should always be getting the 
final state as ARCHIVED.
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        try {
+            List<Callable<Void>> callables = List.of(
+                () -> {
+                    inFlightState.archive();
+                    return null;
+                },
+                () -> {
+                    inFlightState.completeStateTransition(false);
+                    return null;
+                }
+            );
+            executorService.invokeAll(callables);
+        } finally {
+            if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
+                executorService.shutdown();
+        }
+        assertEquals(RecordState.ARCHIVED, inFlightState.state());
+        assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
+    }
+
+    @Test
+    public void inFlightStateCommitSuccessAndArchiveStateTransition() throws 
InterruptedException {
+        InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED, 
1, MEMBER_ID);
+
+        inFlightState.startStateTransition(RecordState.ACKNOWLEDGED, 
DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
+        assertTrue(inFlightState.hasOngoingStateTransition());
+
+        // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED 
which is not committed yet. At the same
+        // time when we have a call to completeStateTransition with true 
commit value, we get a call to ARCHIVE the record.
+        // No matter the order of the 2 calls, we should always be getting the 
final state as ARCHIVED.
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        try {
+            List<Callable<Void>> callables = List.of(
+                () -> {
+                    inFlightState.archive();
+                    return null;
+                },
+                () -> {
+                    inFlightState.completeStateTransition(true);
+                    return null;
+                }
+            );
+            executorService.invokeAll(callables);
+        } finally {
+            if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
+                executorService.shutdown();
+        }
+        assertEquals(RecordState.ARCHIVED, inFlightState.state());
+        assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
+    }
+
+    @Test
+    public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws 
InterruptedException {
+        Persister persister = Mockito.mock(Persister.class);
+        SharePartition sharePartition = SharePartitionBuilder.builder()
+            .withState(SharePartitionState.ACTIVE)
+            .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+            .withPersister(persister)
+            .build();
+
+        fetchAcquiredRecords(
+            sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
+                fetchPartitionData(memoryRecords(2, 0)), FETCH_ISOLATION_HWM
+            ), 2
+        );
+
+        
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+        assertEquals(1, sharePartition.timer().size());
+        assertEquals(RecordState.ACQUIRED, 
sharePartition.cachedState().get(0L).batchState());
+
+        // Return a future which will be completed later, so the batch state 
has ongoing transition.
+        CompletableFuture<WriteShareGroupStateResult> future = new 
CompletableFuture<>();
+        Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+
+        // Acknowledge batch to create ongoing transition.
+        sharePartition.acknowledge(MEMBER_ID, List.of(new 
ShareAcknowledgementBatch(0, 1, List.of(AcknowledgeType.ACCEPT.id))));
+        // Assert the start offset has not moved and batch has ongoing 
transition.
+        assertEquals(0L, sharePartition.startOffset());
+        assertEquals(1, sharePartition.cachedState().size());
+        
assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+        assertEquals(RecordState.ACKNOWLEDGED, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(0L).batchMemberId());
+        // Timer task has not been expired yet.
+        
assertFalse(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask().hasExpired());
+
+        // Allowing acquisition lock to expire. This will not cause any change 
because the record is not in ACQUIRED state.
+        // This will remove the entry of the timer task from timer.
+        mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+        TestUtils.waitForCondition(
+            () -> sharePartition.cachedState().get(0L).batchState() == 
RecordState.ACKNOWLEDGED &&
+                sharePartition.cachedState().get(0L).batchDeliveryCount() == 1 
&&
+                sharePartition.timer().size() == 0,
+            DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+            () -> assertionFailedMessage(sharePartition, Map.of(0L, 
List.of())));
+
+        // Acquisition lock timeout task has run already and is not null.
+        
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+        // Timer task should be expired now.
+        
assertTrue(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask().hasExpired());
+
+        // Complete future exceptionally so acknowledgement for 0-1 offsets 
will be completed.
+        WriteShareGroupStateResult writeShareGroupStateResult = 
Mockito.mock(WriteShareGroupStateResult.class);
+        
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+            new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+                PartitionFactory.newPartitionErrorData(0, 
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+        future.complete(writeShareGroupStateResult);
+
+        // Even though write state RPC has failed and corresponding 
acquisition lock timeout task has expired,
+        // the record should not stuck in ACQUIRED state with no acquisition 
lock timeout task.
+        assertEquals(1, sharePartition.cachedState().size());
+        assertEquals(RecordState.AVAILABLE, 
sharePartition.cachedState().get(0L).batchState());
+        assertEquals(EMPTY_MEMBER_ID, 
sharePartition.cachedState().get(0L).batchMemberId());
+        
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+    }
+
     /**
      * This function produces transactional data of a given no. of records 
followed by a transactional marker (COMMIT/ABORT).
      */
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
 
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
index c83d7e537da..b5480f2e54d 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.server.share.fetch;
 
+import org.apache.kafka.server.util.timer.TimerTask;
+
 /**
  * AcquisitionLockTimeoutHandler is an interface that defines a handler for 
acquisition lock timeouts.
  * It is used to handle cases where the acquisition lock for a share partition 
times out.
@@ -29,6 +31,6 @@ public interface AcquisitionLockTimeoutHandler {
      * @param firstOffset the first offset
      * @param lastOffset the last offset
      */
-    void handle(String memberId, long firstOffset, long lastOffset);
+    void handle(String memberId, long firstOffset, long lastOffset, TimerTask 
timerTask);
 
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
 
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
index 6796d24d374..2766412fa6e 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
@@ -32,6 +32,7 @@ public class AcquisitionLockTimerTask extends TimerTask {
     private final long lastOffset;
     private final AcquisitionLockTimeoutHandler timeoutHandler;
     private final SharePartitionMetrics sharePartitionMetrics;
+    private volatile boolean hasExpired;
 
     public AcquisitionLockTimerTask(
         Time time,
@@ -49,18 +50,28 @@ public class AcquisitionLockTimerTask extends TimerTask {
         this.lastOffset = lastOffset;
         this.timeoutHandler = timeoutHandler;
         this.sharePartitionMetrics = sharePartitionMetrics;
+        this.hasExpired = false;
     }
 
     public long expirationMs() {
         return expirationMs;
     }
 
+    public boolean hasExpired() {
+        return hasExpired;
+    }
+
     /**
      * The task is executed when the acquisition lock timeout is reached. The 
task releases the acquired records.
      */
     @Override
     public void run() {
+        // Mark the request as expired prior executing the timeout. There 
might be concurrent execution
+        // of timeout task and failed acknowledgement which checks if the 
timeout task has expired.
+        // But only one shall update the state to available. The concurrent 
execution is protected by
+        // write lock on the state.
+        hasExpired = true;
         sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset - 
firstOffset + 1);
-        timeoutHandler.handle(memberId, firstOffset, lastOffset);
+        timeoutHandler.handle(memberId, firstOffset, lastOffset, this);
     }
 }
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
index c3e2d353328..df40b943d5a 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
@@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
 
 /**
  * The InFlightBatch maintains the in-memory state of the fetched records i.e. 
in-flight records.
+ * <p>
+ * This class is not thread-safe and caller should attain locks if concurrent 
updates on same batch
+ * are expected.
  */
 public class InFlightBatch {
     // The timer is used to schedule the acquisition lock timeout task for the 
batch.
@@ -147,11 +150,10 @@ public class InFlightBatch {
     /**
      * Archive the batch state. This is used to mark the batch as archived and 
no further updates
      * are allowed to the batch state.
-     * @param newMemberId The new member id for the records.
      * @throws IllegalStateException if the offset state is maintained and the 
batch state is not available.
      */
-    public void archiveBatch(String newMemberId) {
-        inFlightState().archive(newMemberId);
+    public void archiveBatch() {
+        inFlightState().archive();
     }
 
     /**
diff --git 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
index 97b2d55ed0b..d5831d74853 100644
--- 
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
+++ 
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.server.share.fetch;
 
+import org.apache.kafka.common.Uuid;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -25,11 +27,19 @@ import java.util.Objects;
  * The InFlightState is used to track the state and delivery count of a record 
that has been
  * fetched from the leader. The state of the record is used to determine if 
the record should
  * be re-deliver or if it can be acknowledged or archived.
+ * <p>
+ * This class is not thread-safe and caller should attain locks if concurrent 
updates on same state
+ * is expected.
  */
 public class InFlightState {
 
     private static final Logger log = 
LoggerFactory.getLogger(InFlightState.class);
 
+    /**
+     * empty member id used to indicate when a record is not acquired by any 
member.
+     */
+    private static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
+
     // The state of the fetch batch records.
     private RecordState state;
     // The number of times the records has been delivered to the client.
@@ -41,6 +51,9 @@ public class InFlightState {
     private InFlightState rollbackState;
     // The timer task for the acquisition lock timeout.
     private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+    // The boolean determines if the record has achieved a terminal state of 
ARCHIVED from which it cannot transition
+    // to any other state. This could happen because of LSO movement etc.
+    private boolean isTerminalState = false;
 
     // Visible for testing.
     public InFlightState(RecordState state, int deliveryCount, String 
memberId) {
@@ -103,8 +116,10 @@ public class InFlightState {
      * and clear the reference to it.
      */
     public void cancelAndClearAcquisitionLockTimeoutTask() {
-        acquisitionLockTimeoutTask.cancel();
-        acquisitionLockTimeoutTask = null;
+        if (acquisitionLockTimeoutTask != null) {
+            acquisitionLockTimeoutTask.cancel();
+            acquisitionLockTimeoutTask = null;
+        }
     }
 
     /**
@@ -115,12 +130,9 @@ public class InFlightState {
      * @return true if there is an ongoing state transition, false otherwise.
      */
     public boolean hasOngoingStateTransition() {
-        if (rollbackState == null) {
-            // This case could occur when the batch/offset hasn't transitioned 
even once or the state transitions have
-            // been committed.
-            return false;
-        }
-        return rollbackState.state != null;
+        // If batch/offset hasn't transitioned even once or the state 
transitions have been
+        // committed then rollbackState should always be null.
+        return rollbackState != null;
     }
 
     /**
@@ -138,6 +150,17 @@ public class InFlightState {
      */
     public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps 
ops, int maxDeliveryCount, String newMemberId) {
         try {
+            // If the state transition is in progress, the state should not be 
updated.
+            if (hasOngoingStateTransition()) {
+                // A misbehaving client can send multiple requests to update 
the same records hence
+                // do not proceed if the transition is already in progress. Do 
not log an error here
+                // as it might not be an error rather concurrent update of 
same state due to multiple
+                // requests. This ideally should not happen hence log in info 
level, if it happens
+                // frequently then it might be an issue which needs to be 
investigated.
+                log.info("{} has ongoing state transition, cannot update to: 
{}", this, newState);
+                return null;
+            }
+
             if (newState == RecordState.AVAILABLE && ops != 
DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
                 newState = RecordState.ARCHIVED;
             }
@@ -149,7 +172,6 @@ public class InFlightState {
             return this;
         } catch (IllegalStateException e) {
             log.error("Failed to update state of the records", e);
-            rollbackState = null;
             return null;
         }
     }
@@ -159,9 +181,11 @@ public class InFlightState {
      * cancelling the acquisition lock timeout task.
      * This method is used to archive the record when it is no longer needed.
      */
-    public void archive(String newMemberId) {
+    public void archive() {
+        isTerminalState = true;
         state = RecordState.ARCHIVED;
-        memberId = newMemberId;
+        memberId = EMPTY_MEMBER_ID;
+        cancelAndClearAcquisitionLockTimeoutTask();
     }
 
     /**
@@ -178,8 +202,12 @@ public class InFlightState {
      *         helps update chaining.
      */
     public InFlightState startStateTransition(RecordState newState, 
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
-        rollbackState = new InFlightState(state, deliveryCount, memberId, 
acquisitionLockTimeoutTask);
-        return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
+        InFlightState currentState = new InFlightState(state, deliveryCount, 
memberId, acquisitionLockTimeoutTask);
+        InFlightState updatedState = tryUpdateState(newState, ops, 
maxDeliveryCount, newMemberId);
+        if (updatedState != null) {
+            rollbackState = currentState;
+        }
+        return updatedState;
     }
 
     /**
@@ -190,13 +218,22 @@ public class InFlightState {
      * @param commit If true, commits the state transition, otherwise rolls 
back.
      */
     public void completeStateTransition(boolean commit) {
-        if (commit) {
+        if (commit || isTerminalState) {
+            // Cancel the acquisition lock timeout task for the state since it 
is acknowledged/released successfully.
+            cancelAndClearAcquisitionLockTimeoutTask();
             rollbackState = null;
             return;
         }
-        state = rollbackState.state;
-        deliveryCount = rollbackState.deliveryCount;
-        memberId = rollbackState.memberId;
+        // Check is acquisition lock timeout task is expired then mark the 
message as Available.
+        if (acquisitionLockTimeoutTask != null && 
acquisitionLockTimeoutTask.hasExpired()) {
+            state = RecordState.AVAILABLE;
+            memberId = EMPTY_MEMBER_ID;
+            cancelAndClearAcquisitionLockTimeoutTask();
+        } else {
+            state = rollbackState.state;
+            memberId = rollbackState.memberId;
+        }
+        deliveryCount = rollbackState.deliveryCount();
         rollbackState = null;
     }
 


Reply via email to