AndrewJSchofield commented on code in PR #19843:
URL: https://github.com/apache/kafka/pull/19843#discussion_r2113546752


##########
share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java:
##########
@@ -269,6 +269,124 @@ public void testWriteStateSuccess() {
         
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
     }
 
+    @Test
+    public void 
testWriteStateSequentialRequestsWithHigherStateEpochCreateShareSnapshots() {
+        // Makes 3 requests. First 2 with same state epoch, and 3rd with 
incremented state epoch.
+        // The test config defines number of updates/snapshot as 50. So, this 
test proves that
+        // a higher state epoch in a request forces snapshot creation, even if 
number of share updates
+        // have not breached the updates/snapshots limit.
+
+        ShareCoordinatorShard shard = new 
ShareCoordinatorShardBuilder().build();
+        int stateEpoch = 0;
+        SharePartitionKey shareCoordinatorKey = 
SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION);
+
+        WriteShareGroupStateRequestData request = new 
WriteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
WriteShareGroupStateRequestData.WriteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)
+                    .setStartOffset(0)
+                    .setStateEpoch(stateEpoch)
+                    .setLeaderEpoch(0)
+                    .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
+                        .setFirstOffset(0)
+                        .setLastOffset(10)
+                        .setDeliveryCount((short) 1)
+                        .setDeliveryState((byte) 0)))))));
+
+        CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> 
result = shard.writeState(request);
+
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        WriteShareGroupStateResponseData expectedData = 
WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION);
+        List<CoordinatorRecord> expectedRecords = 
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 
TIME.milliseconds())
+        ));
+
+        assertEquals(0, 
shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch());
+        assertEquals(expectedData, result.response());
+        assertEquals(expectedRecords, result.records());
+
+        
assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+            GROUP_ID, TOPIC_ID, PARTITION, 
ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), 
TIME.milliseconds())
+        ).value().message()), 
shard.getShareStateMapValue(shareCoordinatorKey));
+        assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey));
+        
verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+
+        // State epoch stays same so share update.
+        request = new WriteShareGroupStateRequestData()
+            .setGroupId(GROUP_ID)
+            .setTopics(List.of(new 
WriteShareGroupStateRequestData.WriteStateData()
+                .setTopicId(TOPIC_ID)
+                .setPartitions(List.of(new 
WriteShareGroupStateRequestData.PartitionData()
+                    .setPartition(PARTITION)
+                    .setStartOffset(0)
+                    .setStateEpoch(stateEpoch)
+                    .setLeaderEpoch(0)
+                    .setStateBatches(List.of(new 
WriteShareGroupStateRequestData.StateBatch()
+                        .setFirstOffset(0)
+                        .setLastOffset(10)
+                        .setDeliveryCount((short) 2)
+                        .setDeliveryState((byte) 0)))))));
+
+        result = shard.writeState(request);
+
+        shard.replay(0L, 0L, (short) 0, result.records().get(0));
+
+        expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, 
PARTITION);
+        expectedRecords = 
List.of(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(

Review Comment:
   It would be better if the method 
`ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord` was actually 
called `newShareUpdateRecord`. There are only 4 instances so I think it's worth 
making this change too.



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -671,7 +671,8 @@ private CoordinatorRecord generateShareStateRecord(
                     .setCreateTimestamp(timestamp)
                     .setWriteTimestamp(timestamp)
                     .build());
-        } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) {
+        } else if (snapshotUpdateCount.getOrDefault(key, 0) >= 
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot() ||
+            partitionData.stateEpoch() > shareStateMap.get(key).stateEpoch()) {

Review Comment:
   nit: The indentation of the continuation of the if predicate makes it align 
with the following code, and it would be clearer if they were not aligned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to