AndrewJSchofield commented on code in PR #20826:
URL: https://github.com/apache/kafka/pull/20826#discussion_r2494197260
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4011,7 +4050,9 @@ class KafkaApis(val requestChannel: RequestChannel,
erroneous += tp -> ShareAcknowledgeResponse.partitionResponse(tp,
Errors.INVALID_REQUEST)
erroneousTopicIdPartitions.add(tp)
isErroneous = true
- } else if (batch.acknowledgeTypes.stream().anyMatch(ackType =>
ackType < 0 || ackType > 3)) {
+ } else if (batch.acknowledgeTypes.stream().anyMatch(ackType =>
ackType < 0 ||
Review Comment:
I think this would be cleaner like this:
```
val maxAcknowledgeType = if (version >= 2) 4 else 3
```
and then
```
} else if (batch.acknowledgeTypes.stream().anyMatch(ackType =>
ackType < 0 || ackType > maxAcknowledgeType)) {
```
and an additional test
```
} else if (batch.acknowledgeTypes.stream().anyMatch(ackType =>
ackType == 4) && !isRenewAck) {
```
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3421,9 +3456,11 @@ class KafkaApis(val requestChannel: RequestChannel,
sharePartitionManagerInstance:
SharePartitionManager,
authorizedTopics: Set[String],
groupId: String,
- memberId: String):
CompletableFuture[Map[TopicIdPartition,
ShareAcknowledgeResponseData.PartitionData]] = {
+ memberId: String,
+ version: Short,
Review Comment:
I would tend to define this in a more abstract way such as
`supportsRenewAcknowledgements : Boolean`. It is coincidence that both
ShareFetch and ShareAcknowledge v2 support renew acks. The RPCs will diverge
and I think it would be better not to push the actual versions down into the
validation code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]