This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 13b56272741 KAFKA-19337: Write state writes snapshot for higher state
epoch. (#19843)
13b56272741 is described below
commit 13b5627274170b5d49418e32334caee0d7e091a7
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu May 29 18:15:54 2025 +0530
KAFKA-19337: Write state writes snapshot for higher state epoch. (#19843)
- Due to condition on number of updates/snapshot in
`generateShareStateRecord`, share updates gets written for write state
requests even if they have the highest state epoch seen so far.
- A share update cannot record state epoch. As a result, this update
gets missed.
- This PR remedies the issue and adds a test as proof of the fix.
Reviewers: Andrew Schofield <[email protected]>
---
.../share/ShareCoordinatorRecordHelpers.java | 2 +-
.../coordinator/share/ShareCoordinatorShard.java | 5 +-
.../share/ShareCoordinatorRecordHelpersTest.java | 2 +-
.../share/ShareCoordinatorShardTest.java | 120 ++++++++++++++++++++-
4 files changed, 124 insertions(+), 5 deletions(-)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
index e3247a8b926..49daa9a43bc 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
@@ -51,7 +51,7 @@ public class ShareCoordinatorRecordHelpers {
);
}
- public static CoordinatorRecord newShareSnapshotUpdateRecord(String
groupId, Uuid topicId, int partitionId, ShareGroupOffset offsetData) {
+ public static CoordinatorRecord newShareUpdateRecord(String groupId, Uuid
topicId, int partitionId, ShareGroupOffset offsetData) {
return CoordinatorRecord.record(
new ShareUpdateKey()
.setGroupId(groupId)
diff --git
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index b61671ed991..928192edc4f 100644
---
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -657,6 +657,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
SharePartitionKey key
) {
long timestamp = time.milliseconds();
+ int updatesPerSnapshotLimit =
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
if (!shareStateMap.containsKey(key)) {
// Since this is the first time we are getting a write request for
key, we should be creating a share snapshot record.
// The incoming partition data could have overlapping state
batches, we must merge them
@@ -671,7 +672,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
.setCreateTimestamp(timestamp)
.setWriteTimestamp(timestamp)
.build());
- } else if (snapshotUpdateCount.getOrDefault(key, 0) >=
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
+ } else if (snapshotUpdateCount.getOrDefault(key, 0) >=
updatesPerSnapshotLimit || partitionData.stateEpoch() >
shareStateMap.get(key).stateEpoch()) {
ShareGroupOffset currentState = shareStateMap.get(key); //
shareStateMap will have the entry as containsKey is true
int newLeaderEpoch = partitionData.leaderEpoch() == -1 ?
currentState.leaderEpoch() : partitionData.leaderEpoch();
int newStateEpoch = partitionData.stateEpoch() == -1 ?
currentState.stateEpoch() : partitionData.stateEpoch();
@@ -697,7 +698,7 @@ public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord
// Share snapshot is present and number of share snapshot update
records < snapshotUpdateRecordsPerSnapshot
// so create a share update record.
// The incoming partition data could have overlapping state
batches, we must merge them.
- return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+ return ShareCoordinatorRecordHelpers.newShareUpdateRecord(
key.groupId(), key.topicId(), partitionData.partition(),
new ShareGroupOffset.Builder()
.setSnapshotEpoch(currentState.snapshotEpoch()) // Use
same snapshotEpoch as last share snapshot.
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
index c0e7795d25e..630318399d8 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
@@ -84,7 +84,7 @@ public class ShareCoordinatorRecordHelpersTest {
Uuid topicId = Uuid.randomUuid();
int partitionId = 1;
PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0,
(short) 1);
- CoordinatorRecord record =
ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+ CoordinatorRecord record =
ShareCoordinatorRecordHelpers.newShareUpdateRecord(
groupId,
topicId,
partitionId,
diff --git
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 5f91b018bee..0f6dde9259d 100644
---
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -269,6 +269,124 @@ class ShareCoordinatorShardTest {
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
}
+ @Test
+ public void
testWriteStateSequentialRequestsWithHigherStateEpochCreateShareSnapshots() {
+ // Makes 3 requests. First 2 with same state epoch, and 3rd with
incremented state epoch.
+ // The test config defines number of updates/snapshot as 50. So, this
test proves that
+ // a higher state epoch in a request forces snapshot creation, even if
number of share updates
+ // have not breached the updates/snapshots limit.
+
+ ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
+ int stateEpoch = 0;
+ SharePartitionKey shareCoordinatorKey =
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+ WriteShareGroupStateRequestData request = new
WriteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartition(PARTITION)
+ .setStartOffset(0)
+ .setStateEpoch(stateEpoch)
+ .setLeaderEpoch(0)
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 1)
+ .setDeliveryState((byte) 0)))))));
+
+ CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord>
result = shard.writeState(request);
+
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ WriteShareGroupStateResponseData expectedData =
WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+ List<CoordinatorRecord> expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0),
TIME.milliseconds())
+ ));
+
+ assertEquals(0,
shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+
+
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0),
TIME.milliseconds())
+ ).value().message()),
shard.getShareStateMapValue(shareCoordinatorKey));
+ assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
+
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+
+ // State epoch stays same so share update.
+ request = new WriteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartition(PARTITION)
+ .setStartOffset(0)
+ .setStateEpoch(stateEpoch)
+ .setLeaderEpoch(0)
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 2)
+ .setDeliveryState((byte) 0)))))));
+
+ result = shard.writeState(request);
+
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID,
PARTITION);
+ expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareUpdateRecord(
+ GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0),
TIME.milliseconds())
+ ));
+
+ // Snapshot epoch did not increase
+ assertEquals(0,
shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+
+
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0),
TIME.milliseconds())
+ ).value().message()),
shard.getShareStateMapValue(shareCoordinatorKey));
+ assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
+ verify(shard.getMetricsShard(),
times(2)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+
+ // State epoch incremented so share snapshot.
+ request = new WriteShareGroupStateRequestData()
+ .setGroupId(GROUP_ID)
+ .setTopics(List.of(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopicId(TOPIC_ID)
+ .setPartitions(List.of(new
WriteShareGroupStateRequestData.PartitionData()
+ .setPartition(PARTITION)
+ .setStartOffset(0)
+ .setStateEpoch(stateEpoch + 1) // incremented
+ .setLeaderEpoch(0)
+ .setStateBatches(List.of(new
WriteShareGroupStateRequestData.StateBatch()
+ .setFirstOffset(0)
+ .setLastOffset(10)
+ .setDeliveryCount((short) 2)
+ .setDeliveryState((byte) 0)))))));
+
+ result = shard.writeState(request);
+
+ shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+ expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID,
PARTITION);
+ expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 1,
TIME.milliseconds())
+ ));
+
+ // Snapshot epoch increased.
+ assertEquals(1,
shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
+ assertEquals(expectedData, result.response());
+ assertEquals(expectedRecords, result.records());
+
+
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 1,
TIME.milliseconds())
+ ).value().message()),
shard.getShareStateMapValue(shareCoordinatorKey));
+ assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
+ verify(shard.getMetricsShard(),
times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+ }
+
@Test
public void testSubsequentWriteStateSnapshotEpochUpdatesSuccessfully() {
ShareCoordinatorShard shard = new
ShareCoordinatorShardBuilder().build();
@@ -328,7 +446,7 @@ class ShareCoordinatorShardTest {
expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID,
PARTITION);
// The snapshot epoch here will be 1 since this is a snapshot update
record,
// and it refers to parent share snapshot.
- expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+ expectedRecords =
List.of(ShareCoordinatorRecordHelpers.newShareUpdateRecord(
GROUP_ID, TOPIC_ID, PARTITION,
ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0),
TIME.milliseconds())
));