AndrewJSchofield commented on code in PR #20797:
URL: https://github.com/apache/kafka/pull/20797#discussion_r2484764622


##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2504,7 +2504,7 @@ CompletableFuture<Void> 
writeShareGroupState(List<PersisterStateBatch> stateBatc
                 .setGroupId(this.groupId)
                 .setTopicsData(List.of(new 
TopicData<>(topicIdPartition.topicId(),
                     List.of(PartitionFactory.newPartitionStateBatchData(
-                        topicIdPartition.partition(), stateEpoch, 
startOffset(), leaderEpoch, stateBatches))))
+                        topicIdPartition.partition(), stateEpoch, 
startOffset(), inFlightTerminalRecords(), leaderEpoch, stateBatches))))

Review Comment:
   I know that `inFlightTerminalRecords` was the previous name before the KIP 
was approved, but it's changed now. Please can we expunge the old name for the 
sake of the future maintainers.



##########
clients/src/main/resources/common/message/WriteShareGroupStateResponse.json:
##########
@@ -17,7 +17,7 @@
   "apiKey": 85,
   "type": "response",
   "name": "WriteShareGroupStateResponse",
-  "validVersions": "0",

Review Comment:
   Again, a version comment such as:
   ```
     // Version 0 is the initial version (KIP-932).
     //
     // Version 1 introduces DeliveryCompleteCount in the request (KIP-1226).
   ```



##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java:
##########
@@ -149,11 +160,14 @@ public static ShareGroupOffset 
fromRequest(InitializeShareGroupStateRequestData.
     }
 
     public static ShareGroupOffset 
fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int 
snapshotEpoch, long timestamp) {
+        // Since initialization changes the start offset, and hence the in 
flight state is forgotten, the end offset is set
+        // to be the same as the start offset, and the in flight record count 
is set to 0.

Review Comment:
   This comment seems inaccurate. I don't understand how the in-flight record 
count is set to zero here. I do think that the delivery complete count is 
unknown at this point and is set to the uninitialised value.



##########
share-coordinator/src/main/resources/common/message/ShareUpdateValue.json:
##########
@@ -26,6 +26,8 @@
       "about": "The leader epoch of the share-partition." },
     { "name": "StartOffset", "type": "int64", "versions": "0+",
       "about": "The share-partition start offset, or -1 if the start offset is 
not being updated." },
+    { "name": "DeliveryCompleteCount", "type": "int32", "versions": "0+", 
"taggedVersions": "0+", "tag": 0, "default": "-1",

Review Comment:
   And here.



##########
clients/src/main/resources/common/message/WriteShareGroupStateRequest.json:
##########
@@ -18,7 +18,7 @@
   "type": "request",
   "listeners": ["broker"],
   "name": "WriteShareGroupStateRequest",
-  "validVersions": "0",

Review Comment:
   Please can we have a comment to describe the versions. Here's my suggestion:
   ```
     // Version 0 is the initial version (KIP-932).
     //
     // Version 1 introduces DeliveryCompleteCount (KIP-1226).
   
   ```



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13851,7 +13853,7 @@ class KafkaApisTest extends Logging {
   def getWriteShareGroupStateResponse(requestData: 
WriteShareGroupStateRequestData, configOverrides: Map[String, String] = 
Map.empty,
                                       verifyNoErr: Boolean = true, authorizer: 
Authorizer = null,
                                       writeStateResult: 
util.List[WriteShareGroupStateResponseData.WriteStateResult]): 
WriteShareGroupStateResponse = {
-    val requestChannelRequest = buildRequest(new 
WriteShareGroupStateRequest.Builder(requestData).build())
+    val requestChannelRequest = buildRequest(new 
WriteShareGroupStateRequest.Builder(requestData).build(0))

Review Comment:
   Why are you hardcoding version 0 here?



##########
share-coordinator/src/main/resources/common/message/ShareSnapshotValue.json:
##########
@@ -28,6 +28,8 @@
       "about": "The leader epoch of the share-partition." },
     { "name": "StartOffset", "type": "int64", "versions": "0+",
       "about": "The share-partition start offset." },
+    { "name": "DeliveryCompleteCount", "type": "int32", "versions": "0+", 
"taggedVersions": "0+", "tag": 0, "default": "-1",

Review Comment:
   If you look at `ConsumerGroupMetadataValue`, you'll see an example of how a 
new tagged field was commented. I suggest a similar thing here such as
   ```
     // DeliveryCompleteCount was added in Apache Kafka 4.2 (KIP-1226).
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to