AndrewJSchofield commented on code in PR #19843: URL: https://github.com/apache/kafka/pull/19843#discussion_r2113546752
########## share-coordinator/src/test/java/org/apache/kafka/coordinator/share/ShareCoordinatorShardTest.java: ########## @@ -269,6 +269,124 @@ public void testWriteStateSuccess() { verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); } + @Test + public void testWriteStateSequentialRequestsWithHigherStateEpochCreateShareSnapshots() { + // Makes 3 requests. First 2 with same state epoch, and 3rd with incremented state epoch. + // The test config defines number of updates/snapshot as 50. So, this test proves that + // a higher state epoch in a request forces snapshot creation, even if number of share updates + // have not breached the updates/snapshots limit. + + ShareCoordinatorShard shard = new ShareCoordinatorShardBuilder().build(); + int stateEpoch = 0; + SharePartitionKey shareCoordinatorKey = SharePartitionKey.getInstance(GROUP_ID, TOPIC_ID, PARTITION); + + WriteShareGroupStateRequestData request = new WriteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION) + .setStartOffset(0) + .setStateEpoch(stateEpoch) + .setLeaderEpoch(0) + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 1) + .setDeliveryState((byte) 0))))))); + + CoordinatorResult<WriteShareGroupStateResponseData, CoordinatorRecord> result = shard.writeState(request); + + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + WriteShareGroupStateResponseData expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + List<CoordinatorRecord> expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), TIME.milliseconds()) + )); + + assertEquals(0, shard.getShareStateMapValue(shareCoordinatorKey).snapshotEpoch()); + assertEquals(expectedData, result.response()); + assertEquals(expectedRecords, result.records()); + + assertEquals(groupOffset(ShareCoordinatorRecordHelpers.newShareSnapshotRecord( + GROUP_ID, TOPIC_ID, PARTITION, ShareGroupOffset.fromRequest(request.topics().get(0).partitions().get(0), TIME.milliseconds()) + ).value().message()), shard.getShareStateMapValue(shareCoordinatorKey)); + assertEquals(0, shard.getLeaderMapValue(shareCoordinatorKey)); + verify(shard.getMetricsShard()).record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME); + + // State epoch stays same so share update. + request = new WriteShareGroupStateRequestData() + .setGroupId(GROUP_ID) + .setTopics(List.of(new WriteShareGroupStateRequestData.WriteStateData() + .setTopicId(TOPIC_ID) + .setPartitions(List.of(new WriteShareGroupStateRequestData.PartitionData() + .setPartition(PARTITION) + .setStartOffset(0) + .setStateEpoch(stateEpoch) + .setLeaderEpoch(0) + .setStateBatches(List.of(new WriteShareGroupStateRequestData.StateBatch() + .setFirstOffset(0) + .setLastOffset(10) + .setDeliveryCount((short) 2) + .setDeliveryState((byte) 0))))))); + + result = shard.writeState(request); + + shard.replay(0L, 0L, (short) 0, result.records().get(0)); + + expectedData = WriteShareGroupStateResponse.toResponseData(TOPIC_ID, PARTITION); + expectedRecords = List.of(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord( Review Comment: It would be better if the method `ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord` was actually called `newShareUpdateRecord`. There are only 4 instances so I think it's worth making this change too. ########## share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java: ########## @@ -671,7 +671,8 @@ private CoordinatorRecord generateShareStateRecord( .setCreateTimestamp(timestamp) .setWriteTimestamp(timestamp) .build()); - } else if (snapshotUpdateCount.getOrDefault(key, 0) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot()) { + } else if (snapshotUpdateCount.getOrDefault(key, 0) >= config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot() || + partitionData.stateEpoch() > shareStateMap.get(key).stateEpoch()) { Review Comment: nit: The indentation of the continuation of the if predicate makes it align with the following code, and it would be clearer if they were not aligned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org