apoorvmittal10 commented on code in PR #20819:
URL: https://github.com/apache/kafka/pull/20819#discussion_r2494527524
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3033,6 +3033,9 @@ class KafkaApis(val requestChannel: RequestChannel,
} else if (!authHelper.authorize(request.context, READ, GROUP,
shareGroupHeartbeatRequest.data.groupId)) {
requestHelper.sendMaybeThrottle(request,
shareGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
+ } else if (!isMemberIdValid(shareGroupHeartbeatRequest.data.memberId)) {
+ requestHelper.sendMaybeThrottle(request,
shareGroupHeartbeatRequest.getErrorResponse(Errors.INVALID_REQUEST.exception))
+ CompletableFuture.completedFuture[Unit](())
Review Comment:
Just thinking, should we add an error message in response which can tell
what went wrong? Invalid Request is quite generic, wdyt?
##########
core/src/main/java/kafka/server/share/SharePartitionManager.java:
##########
@@ -357,11 +356,10 @@ public CompletableFuture<Map<TopicIdPartition,
ShareAcknowledgeResponseData.Part
String memberId
) {
log.trace("Release session request for groupId: {}, memberId: {}",
groupId, memberId);
- Uuid memberIdUuid = Uuid.fromString(memberId);
List<TopicIdPartition> topicIdPartitions =
cachedTopicIdPartitionsInShareSession(
- groupId, memberIdUuid);
+ groupId, memberId);
Review Comment:
We should correct the javadoc related to memberId.
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4137,6 +4148,11 @@ class KafkaApis(val requestChannel: RequestChannel,
config.shareGroupConfig.isShareGroupEnabled ||
shareVersion().supportsShareGroups
}
+ // Visible for testing.
+ def isMemberIdValid(memberId: String): Boolean = {
+ memberId.nonEmpty && memberId.length <= 36 &&
!memberId.equals(Uuid.ZERO_UUID.toString)
Review Comment:
Can memberId be `" "` i.e. just spaces, is it valid?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4137,6 +4148,11 @@ class KafkaApis(val requestChannel: RequestChannel,
config.shareGroupConfig.isShareGroupEnabled ||
shareVersion().supportsShareGroups
}
+ // Visible for testing.
+ def isMemberIdValid(memberId: String): Boolean = {
+ memberId.nonEmpty && memberId.length <= 36 &&
!memberId.equals(Uuid.ZERO_UUID.toString)
Review Comment:
For my understanding: Also `Uuid.ZERO_UUID.toString` will generate
AAAAAAAAAAAAAAAAAAAAAAAA, by this check we want to restrict usage of
`AAAAA....` as member id?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -4137,6 +4148,11 @@ class KafkaApis(val requestChannel: RequestChannel,
config.shareGroupConfig.isShareGroupEnabled ||
shareVersion().supportsShareGroups
}
+ // Visible for testing.
+ def isMemberIdValid(memberId: String): Boolean = {
+ memberId.nonEmpty && memberId.length <= 36 &&
!memberId.equals(Uuid.ZERO_UUID.toString)
Review Comment:
When we are checking for ZERO_UUID then should we also check that it's not
of any of the reserved Uuid,
https://github.com/apache/kafka/blob/c205e6910ff4c539cb3c03286ccbc0a6f1e74a20/clients/src/main/java/org/apache/kafka/common/Uuid.java#L52?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]