This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 13b56272741 KAFKA-19337: Write state writes snapshot for higher state 
epoch. (#19843)
13b56272741 is described below

commit 13b5627274170b5d49418e32334caee0d7e091a7
Author: Sushant Mahajan <[email protected]>
AuthorDate: Thu May 29 18:15:54 2025 +0530

    KAFKA-19337: Write state writes snapshot for higher state epoch. (#19843)
    
    - Due to condition on number of updates/snapshot in
    `generateShareStateRecord`, share updates gets written for write state
    requests even if they have the highest state epoch seen so far.
    - A share update cannot record state epoch. As a result, this update
    gets missed.
    - This PR remedies the issue and adds a test as proof of the fix.
    
    Reviewers: Andrew Schofield <[email protected]>
---
 .../share/ShareCoordinatorRecordHelpers.java       |   2 +-
 .../coordinator/share/ShareCoordinatorShard.java   |   5 +-
 .../share/ShareCoordinatorRecordHelpersTest.java   |   2 +-
 .../share/ShareCoordinatorShardTest.java           | 120 ++++++++++++++++++++-
 4 files changed, 124 insertions(+), 5 deletions(-)

diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
index e3247a8b926..49daa9a43bc 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpers.java
@@ -51,7 +51,7 @@ public class ShareCoordinatorRecordHelpers {
         );
     }
 
-    public static CoordinatorRecord newShareSnapshotUpdateRecord(String 
groupId, Uuid topicId, int partitionId, ShareGroupOffset offsetData) {
+    public static CoordinatorRecord newShareUpdateRecord(String groupId, Uuid 
topicId, int partitionId, ShareGroupOffset offsetData) {
         return CoordinatorRecord.record(
             new ShareUpdateKey()
                 .setGroupId(groupId)
diff --git 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
index b61671ed991..928192edc4f 100644
--- 
a/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
+++ 
b/share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java
@@ -657,6 +657,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
         SharePartitionKey key
     ) {
         long timestamp = time.milliseconds();
+        int updatesPerSnapshotLimit = 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
         if (!shareStateMap.containsKey(key)) {
             // Since this is the first time we are getting a write request for 
key, we should be creating a share snapshot record.
             // The incoming partition data could have overlapping state 
batches, we must merge them
@@ -671,7 +672,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
                     .setCreateTimestamp(timestamp)
                     .setWriteTimestamp(timestamp)
                     .build());
-        } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
+        } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
updatesPerSnapshotLimit || partitionData.stateEpoch() > 
shareStateMap.get(key).stateEpoch()) {
             ShareGroupOffset currentState = shareStateMap.get(key); // 
shareStateMap will have the entry as containsKey is true
             int newLeaderEpoch = partitionData.leaderEpoch() == -1 ? 
currentState.leaderEpoch() : partitionData.leaderEpoch();
             int newStateEpoch = partitionData.stateEpoch() == -1 ? 
currentState.stateEpoch() : partitionData.stateEpoch();
@@ -697,7 +698,7 @@ public class ShareCoordinatorShard implements 
CoordinatorShard<CoordinatorRecord
             // Share snapshot is present and number of share snapshot update 
records < snapshotUpdateRecordsPerSnapshot
             // so create a share update record.
             // The incoming partition data could have overlapping state 
batches, we must merge them.
-            return ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+            return ShareCoordinatorRecordHelpers.newShareUpdateRecord(
                 key.groupId(), key.topicId(), partitionData.partition(),
                 new ShareGroupOffset.Builder()
                     .setSnapshotEpoch(currentState.snapshotEpoch()) // Use 
same snapshotEpoch as last share snapshot.
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
index c0e7795d25e..630318399d8 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorRecordHelpersTest.java
@@ -84,7 +84,7 @@ public class ShareCoordinatorRecordHelpersTest {
         Uuid topicId = Uuid.randomUuid();
         int partitionId = 1;
         PersisterStateBatch batch = new PersisterStateBatch(1L, 10L, (byte) 0, 
(short) 1);
-        CoordinatorRecord record = 
ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+        CoordinatorRecord record = 
ShareCoordinatorRecordHelpers.newShareUpdateRecord(
             groupId,
             topicId,
             partitionId,
diff --git 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
index 5f91b018bee..0f6dde9259d 100644
--- 
a/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
+++ 
b/share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java
@@ -269,6 +269,124 @@ class ShareCoordinatorShardTest {
         
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void 
testWriteStateSequentialRequestsWithHigherStateEpochCreateShareSnapshots() {
+        // Makes 3 requests. First 2 with same state epoch, and 3rd with 
incremented state epoch.
+        // The test config defines number of updates/snapshot as 50. So, this 
test proves that
+        // a higher state epoch in a request forces snapshot creation, even if 
number of share updates
+        // have not breached the updates/snapshots limit.
+
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        int stateEpoch = 0;
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        WriteShareGroupStateRequestData request = new 
WriteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
WriteShareGroupStateRequestData.WriteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)
+                    .setStartOffset(0)
+                    .setStateEpoch(stateEpoch)
+                    .setLeaderEpoch(0)
+                    .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
+                        .setFirstOffset(0)
+                        .setLastOffset(10)
+                        .setDeliveryCount((short) 1)
+                        .setDeliveryState((byte) 0)))))));
+
+        CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> 
result = shard.writeState(request);
+
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        WriteShareGroupStateResponseData expectedData = 
WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = 
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 
TIME.milliseconds())
+        ));
+
+        assertEquals(0, 
shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 
TIME.milliseconds())
+        ).value().message()), 
shard.getShareStateMapValue(shareCoordinatorKey));
+        assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
+        
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+
+        // State epoch stays same so share update.
+        request = new WriteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
WriteShareGroupStateRequestData.WriteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)
+                    .setStartOffset(0)
+                    .setStateEpoch(stateEpoch)
+                    .setLeaderEpoch(0)
+                    .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
+                        .setFirstOffset(0)
+                        .setLastOffset(10)
+                        .setDeliveryCount((short) 2)
+                        .setDeliveryState((byte) 0)))))));
+
+        result = shard.writeState(request);
+
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, 
PARTITION);
+        expectedRecords = 
List.of(ShareCoordinatorRecordHelpers.newShareUpdateRecord(
+            GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 
TIME.milliseconds())
+        ));
+
+        // Snapshot epoch did not increase
+        assertEquals(0, 
shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 
TIME.milliseconds())
+        ).value().message()), 
shard.getShareStateMapValue(shareCoordinatorKey));
+        assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
+        verify(shard.getMetricsShard(), 
times(2)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+
+        // State epoch incremented so share snapshot.
+        request = new WriteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
WriteShareGroupStateRequestData.WriteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)
+                    .setStartOffset(0)
+                    .setStateEpoch(stateEpoch + 1)   // incremented
+                    .setLeaderEpoch(0)
+                    .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
+                        .setFirstOffset(0)
+                        .setLastOffset(10)
+                        .setDeliveryCount((short) 2)
+                        .setDeliveryState((byte) 0)))))));
+
+        result = shard.writeState(request);
+
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, 
PARTITION);
+        expectedRecords = 
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 1, 
TIME.milliseconds())
+        ));
+
+        // Snapshot epoch increased.
+        assertEquals(1, 
shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 1, 
TIME.milliseconds())
+        ).value().message()), 
shard.getShareStateMapValue(shareCoordinatorKey));
+        assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
+        verify(shard.getMetricsShard(), 
times(3)).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+    }
+
     @Test
     public void testSubsequentWriteStateSnapshotEpochUpdatesSuccessfully() {
         ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
@@ -328,7 +446,7 @@ class ShareCoordinatorShardTest {
         expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, 
PARTITION);
         // The snapshot epoch here will be 1 since this is a snapshot update 
record,
         // and it refers to parent share snapshot.
-        expectedRecords = 
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+        expectedRecords = 
List.of(ShareCoordinatorRecordHelpers.newShareUpdateRecord(
             GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request2.topics().get(0).partitions().get(0), 
TIME.milliseconds())
         ));
 

Reply via email to