This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 15206d5083b KAFKA-18084 Added usage for rollback state while
SharePartition acquires records (#17965)
15206d5083b is described below
commit 15206d5083b18d7ff8afd360304dc17e90f7a35b
Author: Abhinav Dixit <[email protected]>
AuthorDate: Fri Dec 6 23:02:08 2024 +0530
KAFKA-18084 Added usage for rollback state while SharePartition acquires
records (#17965)
Reviewers: Andrew Schofield <[email protected]>, Apoorv Mittal
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../java/kafka/server/share/SharePartition.java | 60 ++++++++++--------
.../kafka/server/share/SharePartitionTest.java | 72 ++++++++++++++++++++++
.../server/ShareFetchAcknowledgeRequestTest.scala | 59 ++++++++++++------
3 files changed, 146 insertions(+), 45 deletions(-)
diff --git a/core/src/main/java/kafka/server/share/SharePartition.java
b/core/src/main/java/kafka/server/share/SharePartition.java
index 4c5e72d0432..00ec974a722 100644
--- a/core/src/main/java/kafka/server/share/SharePartition.java
+++ b/core/src/main/java/kafka/server/share/SharePartition.java
@@ -690,7 +690,7 @@ public class SharePartition {
// acquire subset of offsets from the in-flight batch
but only if the
// complete batch is available yet. Hence, do a
pre-check to avoid exploding
// the in-flight offset tracking unnecessarily.
- if (inFlightBatch.batchState() !=
RecordState.AVAILABLE) {
+ if (inFlightBatch.batchState() !=
RecordState.AVAILABLE || inFlightBatch.batchHasOngoingStateTransition()) {
log.trace("The batch is not available to acquire
in share partition: {}-{}, skipping: {}"
+ " skipping offset tracking for batch as
well.", groupId,
topicIdPartition, inFlightBatch);
@@ -709,7 +709,7 @@ public class SharePartition {
}
// The in-flight batch is a full match hence change the state
of the complete batch.
- if (inFlightBatch.batchState() != RecordState.AVAILABLE) {
+ if (inFlightBatch.batchState() != RecordState.AVAILABLE ||
inFlightBatch.batchHasOngoingStateTransition()) {
log.trace("The batch is not available to acquire in share
partition: {}-{}, skipping: {}",
groupId, topicIdPartition, inFlightBatch);
continue;
@@ -1280,10 +1280,9 @@ public class SharePartition {
break;
}
- if (offsetState.getValue().state != RecordState.AVAILABLE) {
- log.trace("The offset is not available skipping, offset:
{} batch: {}"
- + " for the share partition: {}-{}",
offsetState.getKey(), inFlightBatch,
- groupId, topicIdPartition);
+ if (offsetState.getValue().state != RecordState.AVAILABLE ||
offsetState.getValue().hasOngoingStateTransition()) {
+ log.trace("The offset {} is not available in share
partition: {}-{}, skipping: {}",
+ offsetState.getKey(), groupId, topicIdPartition,
inFlightBatch);
continue;
}
@@ -2066,8 +2065,8 @@ public class SharePartition {
stateBatches.add(new
PersisterStateBatch(inFlightBatch.firstOffset(), inFlightBatch.lastOffset(),
updateResult.state.id, (short)
updateResult.deliveryCount));
- // Update acquisition lock timeout task for the batch to null
since it is completed now.
- updateResult.updateAcquisitionLockTimeoutTask(null);
+ // Cancel the acquisition lock timeout task for the batch since it
is completed now.
+ updateResult.cancelAndClearAcquisitionLockTimeoutTask();
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
@@ -2113,8 +2112,8 @@ public class SharePartition {
stateBatches.add(new PersisterStateBatch(offsetState.getKey(),
offsetState.getKey(),
updateResult.state.id, (short)
updateResult.deliveryCount));
- // Update acquisition lock timeout task for the offset to null
since it is completed now.
- updateResult.updateAcquisitionLockTimeoutTask(null);
+ // Cancel the acquisition lock timeout task for the offset since
it is completed now.
+ updateResult.cancelAndClearAcquisitionLockTimeoutTask();
if (updateResult.state != RecordState.ARCHIVED) {
findNextFetchOffset.set(true);
}
@@ -2251,10 +2250,7 @@ public class SharePartition {
// Visible for testing.
RecordState batchState() {
- if (batchState == null) {
- throw new IllegalStateException("The batch state is not
available as the offset state is maintained");
- }
- return batchState.state;
+ return inFlightState().state;
}
// Visible for testing.
@@ -2275,10 +2271,7 @@ public class SharePartition {
// Visible for testing.
AcquisitionLockTimerTask batchAcquisitionLockTimeoutTask() {
- if (batchState == null) {
- throw new IllegalStateException("The batch state is not
available as the offset state is maintained");
- }
- return batchState.acquisitionLockTimeoutTask;
+ return inFlightState().acquisitionLockTimeoutTask;
}
// Visible for testing.
@@ -2286,11 +2279,19 @@ public class SharePartition {
return offsetState;
}
- private void archiveBatch(String newMemberId) {
+ private InFlightState inFlightState() {
if (batchState == null) {
throw new IllegalStateException("The batch state is not
available as the offset state is maintained");
}
- batchState.archive(newMemberId);
+ return batchState;
+ }
+
+ private boolean batchHasOngoingStateTransition() {
+ return inFlightState().hasOngoingStateTransition();
+ }
+
+ private void archiveBatch(String newMemberId) {
+ inFlightState().archive(newMemberId);
}
private InFlightState tryUpdateBatchState(RecordState newState,
boolean incrementDeliveryCount, int maxDeliveryCount, String newMemberId) {
@@ -2335,10 +2336,7 @@ public class SharePartition {
}
private void updateAcquisitionLockTimeout(AcquisitionLockTimerTask
acquisitionLockTimeoutTask) {
- if (batchState == null) {
- throw new IllegalStateException("The batch state is not
available as the offset state is maintained");
- }
- batchState.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
+ inFlightState().acquisitionLockTimeoutTask =
acquisitionLockTimeoutTask;
}
@Override
@@ -2397,7 +2395,10 @@ public class SharePartition {
return acquisitionLockTimeoutTask;
}
- void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask
acquisitionLockTimeoutTask) {
+ void updateAcquisitionLockTimeoutTask(AcquisitionLockTimerTask
acquisitionLockTimeoutTask) throws IllegalArgumentException {
+ if (this.acquisitionLockTimeoutTask != null) {
+ throw new IllegalArgumentException("Existing acquisition lock
timeout exists, cannot override.");
+ }
this.acquisitionLockTimeoutTask = acquisitionLockTimeoutTask;
}
@@ -2406,6 +2407,15 @@ public class SharePartition {
acquisitionLockTimeoutTask = null;
}
+ private boolean hasOngoingStateTransition() {
+ if (rollbackState == null) {
+ // This case could occur when the batch/offset hasn't
transitioned even once or the state transitions have
+ // been committed.
+ return false;
+ }
+ return rollbackState.state != null;
+ }
+
/**
* Try to update the state of the records. The state of the records
can only be updated if the
* new state is allowed to be transitioned from old state. The
delivery count is not incremented
diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java
b/core/src/test/java/kafka/server/share/SharePartitionTest.java
index fde84d82c4b..8d1bbc28232 100644
--- a/core/src/test/java/kafka/server/share/SharePartitionTest.java
+++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java
@@ -5421,6 +5421,78 @@ public class SharePartitionTest {
assertEquals(734, sharePartition.endOffset());
}
+ @Test
+ public void testAcquireWithWriteShareGroupStateDelay() {
+ Persister persister = Mockito.mock(Persister.class);
+ mockPersisterReadStateMethod(persister);
+ SharePartition sharePartition = SharePartitionBuilder.builder()
+ .withPersister(persister)
+ .withState(SharePartitionState.ACTIVE)
+ .build();
+
+ // Mock persister writeState method so that
sharePartition.isWriteShareGroupStateSuccessful() returns true with a delay of
5 sec.
+ WriteShareGroupStateResult writeShareGroupStateResult =
Mockito.mock(WriteShareGroupStateResult.class);
+
Mockito.when(writeShareGroupStateResult.topicsData()).thenReturn(Collections.singletonList(
+ new TopicData<>(TOPIC_ID_PARTITION.topicId(),
Collections.singletonList(
+ PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(),
Errors.NONE.message())))));
+
+ CompletableFuture<WriteShareGroupStateResult> future = new
CompletableFuture<>();
+ // persister.writeState RPC will not complete instantaneously due to
which commit won't happen for acknowledged offsets.
+ Mockito.when(persister.writeState(Mockito.any())).thenReturn(future);
+
+ sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 0),
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
+ OptionalInt.empty(), false));
+
+ sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(5, 5),
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
+ OptionalInt.empty(), false));
+
+ List<ShareAcknowledgementBatch> acknowledgementBatches = new
ArrayList<>();
+ acknowledgementBatches.add(new ShareAcknowledgementBatch(2, 3,
Collections.singletonList((byte) 2)));
+ acknowledgementBatches.add(new ShareAcknowledgementBatch(5, 9,
Collections.singletonList((byte) 2)));
+ // Acknowledge 2-3, 5-9 offsets with RELEASE acknowledge type.
+ sharePartition.acknowledge(MEMBER_ID, acknowledgementBatches);
+
+ assertEquals(2, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(3L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(4L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
+
+ // Even though offsets 2-3, 5-9 are in available state, but they won't
be acquired since they are still in transition from ACQUIRED
+ // to AVAILABLE state as the write state RPC has not completed yet, so
the commit hasn't happened yet.
+ sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0),
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
+ OptionalInt.empty(), false));
+
+ assertEquals(3, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(0L).offsetState().get(3L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(4L).state());
+ assertEquals(RecordState.AVAILABLE,
sharePartition.cachedState().get(5L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+
+ // persister.writeState RPC will complete now. This is going to commit
all the acknowledged batches. Hence, their
+ // rollBack state will become null and they will be available for
acquire again.
+ future.complete(writeShareGroupStateResult);
+ sharePartition.acquire(MEMBER_ID, MAX_FETCH_RECORDS, new
FetchPartitionData(Errors.NONE, 20, 0, memoryRecords(15, 0),
+ Optional.empty(), OptionalLong.empty(), Optional.empty(),
+ OptionalInt.empty(), false));
+ assertEquals(3, sharePartition.cachedState().size());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(0L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(1L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(2L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(3L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(0L).offsetState().get(4L).state());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(5L).batchState());
+ assertEquals(RecordState.ACQUIRED,
sharePartition.cachedState().get(10L).batchState());
+ }
+
private List<AcquiredRecords> fetchAcquiredRecords(ShareAcquiredRecords
shareAcquiredRecords, int expectedOffsetCount) {
assertNotNull(shareAcquiredRecords);
assertEquals(expectedOffsetCount, shareAcquiredRecords.count());
diff --git
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
index d2679cd62ea..73f9fce42e6 100644
---
a/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ShareFetchAcknowledgeRequestTest.scala
@@ -835,38 +835,57 @@ class ShareFetchAcknowledgeRequestTest(cluster:
ClusterInstance) extends GroupCo
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(Collections.singletonList(0),
Collections.singletonList(9), Collections.singletonList(1)))
- var fetchPartitionData =
shareFetchResponseData.responses().get(0).partitions().get(0)
+ val fetchPartitionData =
shareFetchResponseData.responses().get(0).partitions().get(0)
compareFetchResponsePartitions(expectedFetchPartitionData,
fetchPartitionData)
// Producing 10 more records to the topic created above
produceData(topicIdPartition, 10)
- // Send a third Share Fetch request with piggybacked acknowledgements
- shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
- metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
- acknowledgementsMapForFetch = Map(topicIdPartition -> List(new
ShareFetchRequestData.AcknowledgementBatch()
- .setFirstOffset(0)
- .setLastOffset(9)
- .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava) //
Release the records
- shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
- shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
-
- shareFetchResponseData = shareFetchResponse.data()
- assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
- assertEquals(1, shareFetchResponseData.responses().size())
- assertEquals(topicId, shareFetchResponseData.responses().get(0).topicId())
- assertEquals(1,
shareFetchResponseData.responses().get(0).partitions().size())
-
expectedFetchPartitionData = new ShareFetchResponseData.PartitionData()
.setPartitionIndex(partition)
.setErrorCode(Errors.NONE.code())
.setAcknowledgeErrorCode(Errors.NONE.code())
.setAcquiredRecords(expectedAcquiredRecords(List(0L, 10L).asJava,
List(9L, 19L).asJava, List(2, 1).asJava))
+
+ val acquiredRecords : util.List[AcquiredRecords] = new
util.ArrayList[AcquiredRecords]()
+ var releaseAcknowledgementSent = false
+
+ TestUtils.waitUntilTrue(() => {
+ shareSessionEpoch = ShareRequestMetadata.nextEpoch(shareSessionEpoch)
+ metadata = new ShareRequestMetadata(memberId, shareSessionEpoch)
+ if (releaseAcknowledgementSent) {
+ // For fourth share fetch request onwards
+ acknowledgementsMapForFetch = Map.empty
+ } else {
+ // Send a third Share Fetch request with piggybacked acknowledgements
+ acknowledgementsMapForFetch = Map(topicIdPartition -> List(new
ShareFetchRequestData.AcknowledgementBatch()
+ .setFirstOffset(0)
+ .setLastOffset(9)
+ .setAcknowledgeTypes(Collections.singletonList(2.toByte))).asJava)
// Release the records
+ releaseAcknowledgementSent = true
+ }
+ shareFetchRequest = createShareFetchRequest(groupId, metadata,
MAX_PARTITION_BYTES, send, Seq.empty, acknowledgementsMapForFetch)
+ shareFetchResponse =
connectAndReceive[ShareFetchResponse](shareFetchRequest)
+
+ shareFetchResponseData = shareFetchResponse.data()
+ assertEquals(Errors.NONE.code, shareFetchResponseData.errorCode)
+ assertEquals(1, shareFetchResponseData.responses().size())
+ assertEquals(topicId,
shareFetchResponseData.responses().get(0).topicId())
+ val responseSize =
shareFetchResponseData.responses().get(0).partitions().size()
+ if (responseSize > 0) {
+
acquiredRecords.addAll(shareFetchResponseData.responses().get(0).partitions().get(0).acquiredRecords())
+ }
+ // There should be 2 acquired record batches finally -
+ // 1. batch containing 0-9 offsets which were initially acknowledged as
RELEASED.
+ // 2. batch containing 10-19 offsets which were produced in the second
produceData function call.
+ acquiredRecords.size() == 2
+
+ }, "Share fetch request failed", 5000)
+
// All the records from offsets 0 to 19 will be fetched. Records from 0 to
9 will have delivery count as 2 because
// they are re delivered, and records from 10 to 19 will have delivery
count as 1 because they are newly acquired
-
- fetchPartitionData =
shareFetchResponseData.responses().get(0).partitions().get(0)
- compareFetchResponsePartitions(expectedFetchPartitionData,
fetchPartitionData)
+
assertTrue(expectedFetchPartitionData.acquiredRecords().containsAll(acquiredRecords)
&&
+
acquiredRecords.containsAll(expectedFetchPartitionData.acquiredRecords()))
}
@ClusterTests(