This is an automated email from the ASF dual-hosted git repository.
mittal pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 05d71ad1a8c KAFKA-19476: Concurrent execution fixes for lock timeout
and lso movement (#20286)
05d71ad1a8c is described below
commit 05d71ad1a8cda4bdaa776be0506cb279ec72b1be
Author: Apoorv Mittal <[email protected]>
AuthorDate: Fri Aug 1 23:20:25 2025 +0100
KAFKA-19476: Concurrent execution fixes for lock timeout and lso movement
(#20286)
The PR fixes following:
1. In case share partition arrive at a state which should be treated as
final state
of that batch/offset (example - LSO movement which causes offset/batch
to be ARCHIVED permanently), the result of pending write state RPCs for
that offset/batch override the ARCHIVED state. Hence track such updates
and apply when transition is completed.
2. If an acquisition lock timeout occurs while an offset/batch is
undergoing transition followed by write state RPC failure, then
respective batch/offset can
land in a scenario where the offset stays in ACQUIRED state with no
acquisition lock timeout task.
3. If a timer task is cancelled, but due to concurrent execution of
timer task and acknowledgement, there can be a scenario when timer task
has processed post cancellation. Hence it can mark the offset/batch
re-avaialble despite already acknowledged.
Reviewers: Andrew Schofield <[email protected]>, Abhinav Dixit
<[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 66 +++--
.../kafka/server/share/SharePartitionTest.java | 305 +++++++++++++++++++++
.../share/fetch/AcquisitionLockTimeoutHandler.java | 4 +-
.../share/fetch/AcquisitionLockTimerTask.java | 13 +-
.../kafka/server/share/fetch/InFlightBatch.java | 8 +-
.../kafka/server/share/fetch/InFlightState.java | 71 +++--
6 files changed, 425 insertions(+), 42 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index dbd7e5e1730..db9b862839c 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -241,6 +241,12 @@ public class SharePartition {
*/
private final AcquisitionLockTimeoutHandler timeoutHandler;
+ /**
+ * The replica manager is used to check to see if any delayed share fetch
request can be completed because of data
+ * availability due to acquisition lock timeout.
+ */
+ private final ReplicaManager replicaManager;
+
/**
* The share partition start offset specifies the partition start offset
from which the records
* are cached in the cachedState of the sharePartition.
@@ -295,12 +301,6 @@ public class SharePartition {
*/
private long fetchLockIdleDurationMs;
- /**
- * The replica manager is used to check to see if any delayed share fetch
request can be completed because of data
- * availability due to acquisition lock timeout.
- */
- private final ReplicaManager replicaManager;
-
SharePartition(
String groupId,
TopicIdPartition topicIdPartition,
@@ -1245,10 +1245,7 @@ public class SharePartition {
continue;
}
- offsetState.getValue().archive(EMPTY_MEMBER_ID);
- if (initialState == RecordState.ACQUIRED) {
-
offsetState.getValue().cancelAndClearAcquisitionLockTimeoutTask();
- }
+ offsetState.getValue().archive();
isAnyOffsetArchived = true;
}
return isAnyOffsetArchived;
@@ -1263,10 +1260,7 @@ public class SharePartition {
log.trace("Archiving complete batch: {} for the share partition:
{}-{}", inFlightBatch, groupId, topicIdPartition);
if (inFlightBatch.batchState() == initialState) {
// Change the state of complete batch since the same state
exists for the entire inFlight batch.
- inFlightBatch.archiveBatch(EMPTY_MEMBER_ID);
- if (initialState == RecordState.ACQUIRED) {
- inFlightBatch.cancelAndClearAcquisitionLockTimeoutTask();
- }
+ inFlightBatch.archiveBatch();
return true;
}
} finally {
@@ -1799,6 +1793,12 @@ public class SharePartition {
if (throwable.isPresent()) {
return throwable;
}
+
+ if (inFlightBatch.batchHasOngoingStateTransition()) {
+ log.debug("The batch has on-going transition, batch:
{} for the share "
+ + "partition: {}-{}", inFlightBatch, groupId,
topicIdPartition);
+ return Optional.of(new
InvalidRecordStateException("The record state is invalid. The acknowledgement
of delivery could not be completed."));
+ }
}
// Determine if the in-flight batch is a full match from the
request batch.
@@ -1899,7 +1899,15 @@ public class SharePartition {
+ " partition: {}-{}", offsetState.getKey(),
inFlightBatch, groupId,
topicIdPartition);
return Optional.of(new InvalidRecordStateException(
- "The batch cannot be acknowledged. The offset is not
acquired."));
+ "The offset cannot be acknowledged. The offset is not
acquired."));
+ }
+
+ if (offsetState.getValue().hasOngoingStateTransition()) {
+ log.debug("The offset has on-going transition, offset: {}
batch: {} for the share"
+ + " partition: {}-{}", offsetState.getKey(),
inFlightBatch, groupId,
+ topicIdPartition);
+ return Optional.of(new InvalidRecordStateException(
+ "The record state is invalid. The acknowledgement of
delivery could not be completed."));
}
// Check if member id is the owner of the offset.
@@ -2044,7 +2052,12 @@ public class SharePartition {
// Log in DEBUG to avoid flooding of logs for a faulty client.
log.debug("Request failed for updating state, rollback any
changed state"
+ " for the share partition: {}-{}", groupId,
topicIdPartition);
- updatedStates.forEach(state ->
state.completeStateTransition(false));
+ updatedStates.forEach(state -> {
+ state.completeStateTransition(false);
+ if (state.state() == RecordState.AVAILABLE) {
+ updateFindNextFetchOffset(true);
+ }
+ });
future.completeExceptionally(throwable);
return;
}
@@ -2067,7 +2080,14 @@ public class SharePartition {
if (exception != null) {
log.debug("Failed to write state to persister for the
share partition: {}-{}",
groupId, topicIdPartition, exception);
- updatedStates.forEach(state ->
state.completeStateTransition(false));
+ // In case of failure when transition state is rolled back
then it should be rolled
+ // back to ACQUIRED state, unless acquisition lock for the
state has expired.
+ updatedStates.forEach(state -> {
+ state.completeStateTransition(false);
+ if (state.state() == RecordState.AVAILABLE) {
+ updateFindNextFetchOffset(true);
+ }
+ });
future.completeExceptionally(exception);
return;
}
@@ -2076,8 +2096,6 @@ public class SharePartition {
groupId, topicIdPartition);
updatedStates.forEach(state -> {
state.completeStateTransition(true);
- // Cancel the acquisition lock timeout task for the state
since it is acknowledged/released successfully.
- state.cancelAndClearAcquisitionLockTimeoutTask();
if (state.state() == RecordState.AVAILABLE) {
updateFindNextFetchOffset(true);
}
@@ -2389,10 +2407,18 @@ public class SharePartition {
}
private AcquisitionLockTimeoutHandler releaseAcquisitionLockOnTimeout() {
- return (memberId, firstOffset, lastOffset) -> {
+ return (memberId, firstOffset, lastOffset, timerTask) -> {
List<PersisterStateBatch> stateBatches;
lock.writeLock().lock();
try {
+ // Check if timer task is already cancelled. This can happen
when concurrent requests
+ // happen to acknowledge in-flight state and timeout handler
is waiting for the lock
+ // but already cancelled.
+ if (timerTask.isCancelled()) {
+ log.debug("Timer task is already cancelled, not executing
further.");
+ return;
+ }
+
Map.Entry<Long, InFlightBatch> floorOffset =
cachedState.floorEntry(firstOffset);
if (floorOffset == null) {
log.error("Base offset {} not found for share partition:
{}-{}", firstOffset, groupId, topicIdPartition);
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index c17ce391b37..1289d720054 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -55,6 +55,7 @@ import
org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch;
import org.apache.kafka.server.share.fetch.AcquisitionLockTimerTask;
import org.apache.kafka.server.share.fetch.DelayedShareFetchGroupKey;
+import org.apache.kafka.server.share.fetch.DeliveryCountOps;
import org.apache.kafka.server.share.fetch.InFlightState;
import org.apache.kafka.server.share.fetch.RecordState;
import org.apache.kafka.server.share.fetch.ShareAcquiredRecords;
@@ -71,6 +72,7 @@ import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
+import org.apache.kafka.server.util.timer.TimerTask;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils;
@@ -88,6 +90,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
+import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -7573,6 +7576,308 @@ public class SharePartitionTest {
assertEquals(20, sharePartition.nextFetchOffset());
}
+ @Test
+ public void testAcquisitionLockTimeoutWithConcurrentAcknowledgement()
throws InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withPersister(persister)
+ .build();
+
+ // Create 2 batches of records.
+ ByteBuffer buffer = ByteBuffer.allocate(4096);
+ memoryRecordsBuilder(buffer, 5, 0).close();
+ memoryRecordsBuilder(buffer, 15, 5).close();
+
+ buffer.flip();
+
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ // Acquire 10 records.
+ fetchAcquiredRecords(sharePartition.acquire(
+ MEMBER_ID,
+ 5, /* Batch size of 5 so cache can have 2 entries */
+ 10,
+ DEFAULT_FETCH_OFFSET,
+ fetchPartitionData(records, 0),
+ FETCH_ISOLATION_HWM),
+ 20);
+
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(2, sharePartition.timer().size());
+
+ // Return 2 future which will be completed later.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+ // Store the corresponding batch timer tasks.
+ TimerTask timerTask1 =
sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask();
+ TimerTask timerTask2 =
sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask();
+
+ // Acknowledge 1 offset in first batch as Accept to create offset
tracking, accept complete
+ // sencond batch. And mark offset 0 as release so cached state do not
move ahead.
+ sharePartition.acknowledge(MEMBER_ID, List.of(
+ new ShareAcknowledgementBatch(0, 0,
List.of(AcknowledgeType.RELEASE.id)),
+ new ShareAcknowledgementBatch(1, 1,
List.of(AcknowledgeType.ACCEPT.id)),
+ new ShareAcknowledgementBatch(5, 19,
List.of(AcknowledgeType.ACCEPT.id))));
+
+ // Assert the start offset has not moved.
+ assertEquals(0L, sharePartition.startOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).batchState());
+ // Verify ongoing transition states.
+
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(0L).hasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(0L).offsetState().get(1L).hasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(0L).offsetState().get(2L).hasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(5L).batchHasOngoingStateTransition());
+
+ // Validate first timer task is already cancelled.
+ assertTrue(timerTask1.isCancelled());
+ assertFalse(timerTask2.isCancelled());
+
+ // Fetch offset state timer tasks.
+ TimerTask timerTaskOffsetState1 =
sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask();
+ TimerTask timerTaskOffsetState2 =
sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask();
+ TimerTask timerTaskOffsetState3 =
sharePartition.cachedState().get(0L).offsetState().get(2L).acquisitionLockTimeoutTask();
+
+ // Complete futures.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+ future1.complete(writeShareGroupStateResult);
+ future2.complete(writeShareGroupStateResult);
+
+ // Verify timer tasks are now cancelled, except unacknowledged offsets.
+ assertEquals(2, sharePartition.cachedState().size());
+ assertTrue(timerTask2.isCancelled());
+ assertTrue(timerTaskOffsetState1.isCancelled());
+ assertTrue(timerTaskOffsetState2.isCancelled());
+ assertFalse(timerTaskOffsetState3.isCancelled());
+
+ // Verify the state prior executing the timer tasks.
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).batchState());
+
+ // Running expired timer tasks should not mark offsets available,
except for offset 2.
+ timerTask1.run();
+ // State should remain same.
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+
+ timerTask2.run();
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(5L).batchState());
+
+ timerTaskOffsetState2.run();
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+
+ // Should update the state to available as the timer task is not yet
expired.
+ timerTaskOffsetState3.run();
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+ }
+
+ @Test
+ public void testLsoMovementWithWriteStateRPCFailuresInAcknowledgement() {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withPersister(persister)
+ .build();
+
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5);
+ fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5);
+
+ // Validate that there is no ongoing transition.
+
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+
assertFalse(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+ // Return futures which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future1 = new
CompletableFuture<>();
+ CompletableFuture<WriteShareGroupStateResult> future2 = new
CompletableFuture<>();
+
+ // Mocking the persister write state RPC to return future 1 and future
2 when acknowledgement occurs for
+ // offsets 2-6 and 7-11 respectively.
+
Mockito.when(persister.writeState(Mockito.any())).thenReturn(future1).thenReturn(future2);
+
+ // Acknowledge batch to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(2, 6, List.of(AcknowledgeType.RELEASE.id))));
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(7, 11, List.of(AcknowledgeType.RELEASE.id))));
+
+ // Validate that there is no ongoing transition.
+
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+
+ // Move LSO to 7, so some records/offsets can be marked archived for
the first batch.
+ sharePartition.updateCacheAndOffsets(7L);
+
+ // Start offset will be moved.
+ assertEquals(12L, sharePartition.nextFetchOffset());
+ assertEquals(7L, sharePartition.startOffset());
+ assertEquals(11, sharePartition.endOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+
assertTrue(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(2L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).batchState());
+
+ // Complete future1 exceptionally so acknowledgement for 2-6 offsets
will be completed.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+ future1.complete(writeShareGroupStateResult);
+
+ // The completion of future1 with exception should not impact the
cached state since those records have already
+ // been archived.
+ assertEquals(12, sharePartition.nextFetchOffset());
+ assertEquals(7, sharePartition.startOffset());
+ assertEquals(11, sharePartition.endOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+
assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition());
+
assertTrue(sharePartition.cachedState().get(7L).batchHasOngoingStateTransition());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(2L).batchState());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(7L).batchState());
+
+ future2.complete(writeShareGroupStateResult);
+ assertEquals(12L, sharePartition.nextFetchOffset());
+ assertEquals(7, sharePartition.startOffset());
+ assertEquals(11, sharePartition.endOffset());
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(RecordState.ARCHIVED,
sharePartition.cachedState().get(2L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(7L).batchState());
+ }
+
+ @Test
+ public void inFlightStateRollbackAndArchiveStateTransition() throws
InterruptedException {
+ InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED,
1, MEMBER_ID);
+
+ inFlightState.startStateTransition(RecordState.ACKNOWLEDGED,
DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
+ assertTrue(inFlightState.hasOngoingStateTransition());
+
+ // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED
which is not committed yet. At the same
+ // time when we have a call to completeStateTransition with false
commit value, we get a call to ARCHIVE the record.
+ // No matter the order of the 2 calls, we should always be getting the
final state as ARCHIVED.
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ try {
+ List<Callable<Void>> callables = List.of(
+ () -> {
+ inFlightState.archive();
+ return null;
+ },
+ () -> {
+ inFlightState.completeStateTransition(false);
+ return null;
+ }
+ );
+ executorService.invokeAll(callables);
+ } finally {
+ if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
+ executorService.shutdown();
+ }
+ assertEquals(RecordState.ARCHIVED, inFlightState.state());
+ assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
+ }
+
+ @Test
+ public void inFlightStateCommitSuccessAndArchiveStateTransition() throws
InterruptedException {
+ InFlightState inFlightState = new InFlightState(RecordState.ACQUIRED,
1, MEMBER_ID);
+
+ inFlightState.startStateTransition(RecordState.ACKNOWLEDGED,
DeliveryCountOps.INCREASE, MAX_DELIVERY_COUNT, MEMBER_ID);
+ assertTrue(inFlightState.hasOngoingStateTransition());
+
+ // We have an ongoing state transition from ACQUIRED to ACKNOWLEDGED
which is not committed yet. At the same
+ // time when we have a call to completeStateTransition with true
commit value, we get a call to ARCHIVE the record.
+ // No matter the order of the 2 calls, we should always be getting the
final state as ARCHIVED.
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ try {
+ List<Callable<Void>> callables = List.of(
+ () -> {
+ inFlightState.archive();
+ return null;
+ },
+ () -> {
+ inFlightState.completeStateTransition(true);
+ return null;
+ }
+ );
+ executorService.invokeAll(callables);
+ } finally {
+ if (!executorService.awaitTermination(30, TimeUnit.MILLISECONDS))
+ executorService.shutdown();
+ }
+ assertEquals(RecordState.ARCHIVED, inFlightState.state());
+ assertEquals(EMPTY_MEMBER_ID, inFlightState.memberId());
+ }
+
+ @Test
+ public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws
InterruptedException {
+ Persister persister = Mockito.mock(Persister.class);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withState(SharePartitionState.ACTIVE)
+ .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
+ .withPersister(persister)
+ .build();
+
+ fetchAcquiredRecords(
+ sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0,
+ fetchPartitionData(memoryRecords(2, 0)), FETCH_ISOLATION_HWM
+ ), 2
+ );
+
+
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+ assertEquals(1, sharePartition.timer().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).batchState());
+
+ // Return a future which will be completed later, so the batch state
has ongoing transition.
+ CompletableFuture<WriteShareGroupStateResult> future = new
CompletableFuture<>();
+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+
+ // Acknowledge batch to create ongoing transition.
+ sharePartition.acknowledge(MEMBER_ID, List.of(new
ShareAcknowledgementBatch(0, 1, List.of(AcknowledgeType.ACCEPT.id))));
+ // Assert the start offset has not moved and batch has ongoing
transition.
+ assertEquals(0L, sharePartition.startOffset());
+ assertEquals(1, sharePartition.cachedState().size());
+
assertTrue(sharePartition.cachedState().get(0L).batchHasOngoingStateTransition());
+ assertEquals(RecordState.ACKNOWLEDGED,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(0L).batchMemberId());
+ // Timer task has not been expired yet.
+
assertFalse(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask().hasExpired());
+
+ // Allowing acquisition lock to expire. This will not cause any change
because the record is not in ACQUIRED state.
+ // This will remove the entry of the timer task from timer.
+ mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
+ TestUtils.waitForCondition(
+ () -> sharePartition.cachedState().get(0L).batchState() ==
RecordState.ACKNOWLEDGED &&
+ sharePartition.cachedState().get(0L).batchDeliveryCount() == 1
&&
+ sharePartition.timer().size() == 0,
+ DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS,
+ () -> assertionFailedMessage(sharePartition, Map.of(0L,
List.of())));
+
+ // Acquisition lock timeout task has run already and is not null.
+
assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+ // Timer task should be expired now.
+
assertTrue(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask().hasExpired());
+
+ // Complete future exceptionally so acknowledgement for 0-1 offsets
will be completed.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(List.of(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(), List.of(
+ PartitionFactory.newPartitionErrorData(0,
Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message())))));
+ future.complete(writeShareGroupStateResult);
+
+ // Even though write state RPC has failed and corresponding
acquisition lock timeout task has expired,
+ // the record should not stuck in ACQUIRED state with no acquisition
lock timeout task.
+ assertEquals(1, sharePartition.cachedState().size());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).batchState());
+ assertEquals(EMPTY_MEMBER_ID,
sharePartition.cachedState().get(0L).batchMemberId());
+
assertNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask());
+ }
+
/**
* This function produces transactional data of a given no. of records
followed by a transactional marker (COMMIT/ABORT).
*/
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
index c83d7e537da..b5480f2e54d 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimeoutHandler.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.server.share.fetch;
+import org.apache.kafka.server.util.timer.TimerTask;
+
/**
* AcquisitionLockTimeoutHandler is an interface that defines a handler for
acquisition lock timeouts.
* It is used to handle cases where the acquisition lock for a share partition
times out.
@@ -29,6 +31,6 @@ public interface AcquisitionLockTimeoutHandler {
* @param firstOffset the first offset
* @param lastOffset the last offset
*/
- void handle(String memberId, long firstOffset, long lastOffset);
+ void handle(String memberId, long firstOffset, long lastOffset, TimerTask
timerTask);
}
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
index 6796d24d374..2766412fa6e 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/AcquisitionLockTimerTask.java
@@ -32,6 +32,7 @@ public class AcquisitionLockTimerTask extends TimerTask {
private final long lastOffset;
private final AcquisitionLockTimeoutHandler timeoutHandler;
private final SharePartitionMetrics sharePartitionMetrics;
+ private volatile boolean hasExpired;
public AcquisitionLockTimerTask(
Time time,
@@ -49,18 +50,28 @@ public class AcquisitionLockTimerTask extends TimerTask {
this.lastOffset = lastOffset;
this.timeoutHandler = timeoutHandler;
this.sharePartitionMetrics = sharePartitionMetrics;
+ this.hasExpired = false;
}
public long expirationMs() {
return expirationMs;
}
+ public boolean hasExpired() {
+ return hasExpired;
+ }
+
/**
* The task is executed when the acquisition lock timeout is reached. The
task releases the acquired records.
*/
@Override
public void run() {
+ // Mark the request as expired prior executing the timeout. There
might be concurrent execution
+ // of timeout task and failed acknowledgement which checks if the
timeout task has expired.
+ // But only one shall update the state to available. The concurrent
execution is protected by
+ // write lock on the state.
+ hasExpired = true;
sharePartitionMetrics.recordAcquisitionLockTimeoutPerSec(lastOffset -
firstOffset + 1);
- timeoutHandler.handle(memberId, firstOffset, lastOffset);
+ timeoutHandler.handle(memberId, firstOffset, lastOffset, this);
}
}
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
index c3e2d353328..df40b943d5a 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightBatch.java
@@ -25,6 +25,9 @@ import java.util.concurrent.ConcurrentSkipListMap;
/**
* The InFlightBatch maintains the in-memory state of the fetched records i.e.
in-flight records.
+ * <p>
+ * This class is not thread-safe and caller should attain locks if concurrent
updates on same batch
+ * are expected.
*/
public class InFlightBatch {
// The timer is used to schedule the acquisition lock timeout task for the
batch.
@@ -147,11 +150,10 @@ public class InFlightBatch {
/**
* Archive the batch state. This is used to mark the batch as archived and
no further updates
* are allowed to the batch state.
- * @param newMemberId The new member id for the records.
* @throws IllegalStateException if the offset state is maintained and the
batch state is not available.
*/
- public void archiveBatch(String newMemberId) {
- inFlightState().archive(newMemberId);
+ public void archiveBatch() {
+ inFlightState().archive();
}
/**
diff --git
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
index 97b2d55ed0b..d5831d74853 100644
---
a/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
+++
b/server/src/main/java/org/apache/kafka/server/share/fetch/InFlightState.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.server.share.fetch;
+import org.apache.kafka.common.Uuid;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -25,11 +27,19 @@ import java.util.Objects;
* The InFlightState is used to track the state and delivery count of a record
that has been
* fetched from the leader. The state of the record is used to determine if
the record should
* be re-deliver or if it can be acknowledged or archived.
+ * <p>
+ * This class is not thread-safe and caller should attain locks if concurrent
updates on same state
+ * is expected.
*/
public class InFlightState {
private static final Logger log =
LoggerFactory.getLogger(InFlightState.class);
+ /**
+ * empty member id used to indicate when a record is not acquired by any
member.
+ */
+ private static final String EMPTY_MEMBER_ID = Uuid.ZERO_UUID.toString();
+
// The state of the fetch batch records.
private RecordState state;
// The number of times the records has been delivered to the client.
@@ -41,6 +51,9 @@ public class InFlightState {
private InFlightState rollbackState;
// The timer task for the acquisition lock timeout.
private AcquisitionLockTimerTask acquisitionLockTimeoutTask;
+ // The boolean determines if the record has achieved a terminal state of
ARCHIVED from which it cannot transition
+ // to any other state. This could happen because of LSO movement etc.
+ private boolean isTerminalState = false;
// Visible for testing.
public InFlightState(RecordState state, int deliveryCount, String
memberId) {
@@ -103,8 +116,10 @@ public class InFlightState {
* and clear the reference to it.
*/
public void cancelAndClearAcquisitionLockTimeoutTask() {
- acquisitionLockTimeoutTask.cancel();
- acquisitionLockTimeoutTask = null;
+ if (acquisitionLockTimeoutTask != null) {
+ acquisitionLockTimeoutTask.cancel();
+ acquisitionLockTimeoutTask = null;
+ }
}
/**
@@ -115,12 +130,9 @@ public class InFlightState {
* @return true if there is an ongoing state transition, false otherwise.
*/
public boolean hasOngoingStateTransition() {
- if (rollbackState == null) {
- // This case could occur when the batch/offset hasn't transitioned
even once or the state transitions have
- // been committed.
- return false;
- }
- return rollbackState.state != null;
+ // If batch/offset hasn't transitioned even once or the state
transitions have been
+ // committed then rollbackState should always be null.
+ return rollbackState != null;
}
/**
@@ -138,6 +150,17 @@ public class InFlightState {
*/
public InFlightState tryUpdateState(RecordState newState, DeliveryCountOps
ops, int maxDeliveryCount, String newMemberId) {
try {
+ // If the state transition is in progress, the state should not be
updated.
+ if (hasOngoingStateTransition()) {
+ // A misbehaving client can send multiple requests to update
the same records hence
+ // do not proceed if the transition is already in progress. Do
not log an error here
+ // as it might not be an error rather concurrent update of
same state due to multiple
+ // requests. This ideally should not happen hence log in info
level, if it happens
+ // frequently then it might be an issue which needs to be
investigated.
+ log.info("{} has ongoing state transition, cannot update to:
{}", this, newState);
+ return null;
+ }
+
if (newState == RecordState.AVAILABLE && ops !=
DeliveryCountOps.DECREASE && deliveryCount >= maxDeliveryCount) {
newState = RecordState.ARCHIVED;
}
@@ -149,7 +172,6 @@ public class InFlightState {
return this;
} catch (IllegalStateException e) {
log.error("Failed to update state of the records", e);
- rollbackState = null;
return null;
}
}
@@ -159,9 +181,11 @@ public class InFlightState {
* cancelling the acquisition lock timeout task.
* This method is used to archive the record when it is no longer needed.
*/
- public void archive(String newMemberId) {
+ public void archive() {
+ isTerminalState = true;
state = RecordState.ARCHIVED;
- memberId = newMemberId;
+ memberId = EMPTY_MEMBER_ID;
+ cancelAndClearAcquisitionLockTimeoutTask();
}
/**
@@ -178,8 +202,12 @@ public class InFlightState {
* helps update chaining.
*/
public InFlightState startStateTransition(RecordState newState,
DeliveryCountOps ops, int maxDeliveryCount, String newMemberId) {
- rollbackState = new InFlightState(state, deliveryCount, memberId,
acquisitionLockTimeoutTask);
- return tryUpdateState(newState, ops, maxDeliveryCount, newMemberId);
+ InFlightState currentState = new InFlightState(state, deliveryCount,
memberId, acquisitionLockTimeoutTask);
+ InFlightState updatedState = tryUpdateState(newState, ops,
maxDeliveryCount, newMemberId);
+ if (updatedState != null) {
+ rollbackState = currentState;
+ }
+ return updatedState;
}
/**
@@ -190,13 +218,22 @@ public class InFlightState {
* @param commit If true, commits the state transition, otherwise rolls
back.
*/
public void completeStateTransition(boolean commit) {
- if (commit) {
+ if (commit || isTerminalState) {
+ // Cancel the acquisition lock timeout task for the state since it
is acknowledged/released successfully.
+ cancelAndClearAcquisitionLockTimeoutTask();
rollbackState = null;
return;
}
- state = rollbackState.state;
- deliveryCount = rollbackState.deliveryCount;
- memberId = rollbackState.memberId;
+ // Check is acquisition lock timeout task is expired then mark the
message as Available.
+ if (acquisitionLockTimeoutTask != null &&
acquisitionLockTimeoutTask.hasExpired()) {
+ state = RecordState.AVAILABLE;
+ memberId = EMPTY_MEMBER_ID;
+ cancelAndClearAcquisitionLockTimeoutTask();
+ } else {
+ state = rollbackState.state;
+ memberId = rollbackState.memberId;
+ }
+ deliveryCount = rollbackState.deliveryCount();
rollbackState = null;
}