AndrewJSchofield commented on code in PR #20797:
URL: https://github.com/apache/kafka/pull/20797#discussion_r2484764622
##########
core/src/main/java/kafka/server/share/SharePartition.java:
##########
@@ -2504,7 +2504,7 @@ CompletableFuture<Void>
writeShareGroupState(List<PersisterStateBatch> stateBatc
.setGroupId(this.groupId)
.setTopicsData(List.of(new
TopicData<>(topicIdPartition.topicId(),
List.of(PartitionFactory.newPartitionStateBatchData(
- topicIdPartition.partition(), stateEpoch,
startOffset(), leaderEpoch, stateBatches))))
+ topicIdPartition.partition(), stateEpoch,
startOffset(), inFlightTerminalRecords(), leaderEpoch, stateBatches))))
Review Comment:
I know that `inFlightTerminalRecords` was the previous name before the KIP
was approved, but it's changed now. Please can we expunge the old name for the
sake of the future maintainers.
##########
clients/src/main/resources/common/message/WriteShareGroupStateResponse.json:
##########
@@ -17,7 +17,7 @@
"apiKey": 85,
"type": "response",
"name": "WriteShareGroupStateResponse",
- "validVersions": "0",
Review Comment:
Again, a version comment such as:
```
// Version 0 is the initial version (KIP-932).
//
// Version 1 introduces DeliveryCompleteCount in the request (KIP-1226).
```
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareGroupOffset.java:
##########
@@ -149,11 +160,14 @@ public static ShareGroupOffset
fromRequest(InitializeShareGroupStateRequestData.
}
public static ShareGroupOffset
fromRequest(InitializeShareGroupStateRequestData.PartitionData data, int
snapshotEpoch, long timestamp) {
+ // Since initialization changes the start offset, and hence the in
flight state is forgotten, the end offset is set
+ // to be the same as the start offset, and the in flight record count
is set to 0.
Review Comment:
This comment seems inaccurate. I don't understand how the in-flight record
count is set to zero here. I do think that the delivery complete count is
unknown at this point and is set to the uninitialised value.
##########
share-coordinator/src/main/resources/common/message/ShareUpdateValue.json:
##########
@@ -26,6 +26,8 @@
"about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset, or -1 if the start offset is
not being updated." },
+ { "name": "DeliveryCompleteCount", "type": "int32", "versions": "0+",
"taggedVersions": "0+", "tag": 0, "default": "-1",
Review Comment:
And here.
##########
clients/src/main/resources/common/message/WriteShareGroupStateRequest.json:
##########
@@ -18,7 +18,7 @@
"type": "request",
"listeners": ["broker"],
"name": "WriteShareGroupStateRequest",
- "validVersions": "0",
Review Comment:
Please can we have a comment to describe the versions. Here's my suggestion:
```
// Version 0 is the initial version (KIP-932).
//
// Version 1 introduces DeliveryCompleteCount (KIP-1226).
```
##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -13851,7 +13853,7 @@ class KafkaApisTest extends Logging {
def getWriteShareGroupStateResponse(requestData:
WriteShareGroupStateRequestData, configOverrides: Map[String, String] =
Map.empty,
verifyNoErr: Boolean = true, authorizer:
Authorizer = null,
writeStateResult:
util.List[WriteShareGroupStateResponseData.WriteStateResult]):
WriteShareGroupStateResponse = {
- val requestChannelRequest = buildRequest(new
WriteShareGroupStateRequest.Builder(requestData).build())
+ val requestChannelRequest = buildRequest(new
WriteShareGroupStateRequest.Builder(requestData).build(0))
Review Comment:
Why are you hardcoding version 0 here?
##########
share-coordinator/src/main/resources/common/message/ShareSnapshotValue.json:
##########
@@ -28,6 +28,8 @@
"about": "The leader epoch of the share-partition." },
{ "name": "StartOffset", "type": "int64", "versions": "0+",
"about": "The share-partition start offset." },
+ { "name": "DeliveryCompleteCount", "type": "int32", "versions": "0+",
"taggedVersions": "0+", "tag": 0, "default": "-1",
Review Comment:
If you look at `ConsumerGroupMetadataValue`, you'll see an example of how a
new tagged field was commented. I suggest a similar thing here such as
```
// DeliveryCompleteCount was added in Apache Kafka 4.2 (KIP-1226).
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]