lucasbru commented on code in PR #17695:
URL: https://github.com/apache/kafka/pull/17695#discussion_r1831143285
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3879,103 +3878,83 @@ class KafkaApis(val requestChannel: RequestChannel,
config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
}
- def handleStreamsGroupInitialize(request: RequestChannel.Request):
CompletableFuture[Unit] = {
- // TODO: The unit tests for this method are insufficient. Once we merge
initialize with group heartbeat, we have to extend the tests to cover ACLs and
internal topic creation
- val streamsGroupInitializeRequest =
request.body[StreamsGroupInitializeRequest]
+ def handleStreamsGroupHeartbeat(request: RequestChannel.Request):
CompletableFuture[Unit] = {
+ val streamsGroupHeartbeatRequest =
request.body[StreamsGroupHeartbeatRequest]
if (!isStreamsGroupProtocolEnabled()) {
// The API is not supported by the "old" group coordinator (the
default). If the
// new one is not enabled, we fail directly here.
- requestHelper.sendMaybeThrottle(request,
streamsGroupInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+ requestHelper.sendMaybeThrottle(request,
streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
CompletableFuture.completedFuture[Unit](())
- } else if (!authHelper.authorize(request.context, READ, GROUP,
streamsGroupInitializeRequest.data.groupId)) {
- requestHelper.sendMaybeThrottle(request,
streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+ } else if (!authHelper.authorize(request.context, READ, GROUP,
streamsGroupHeartbeatRequest.data.groupId)) {
+ requestHelper.sendMaybeThrottle(request,
streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
CompletableFuture.completedFuture[Unit](())
} else {
val requestContext = request.context
- val internalTopics: Map[String,
StreamsGroupInitializeRequestData.TopicInfo] = {
-
streamsGroupInitializeRequest.data().topology().asScala.flatMap(subtopology =>
- subtopology.repartitionSourceTopics().iterator().asScala ++
subtopology.stateChangelogTopics().iterator().asScala
- ).map(x => x.name() -> x).toMap
- }
-
- val prohibitedInternalTopics =
internalTopics.keys.filter(Topic.isInternal)
- if (prohibitedInternalTopics.nonEmpty) {
- val errorResponse = new StreamsGroupInitializeResponseData()
- errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
- errorResponse.setErrorMessage(f"Use of Kafka internal topics
${prohibitedInternalTopics.mkString(",")} as Kafka Streams internal topics is
prohibited.")
- requestHelper.sendMaybeThrottle(request, new
StreamsGroupInitializeResponse(errorResponse))
- return CompletableFuture.completedFuture[Unit](())
- }
+ if (streamsGroupHeartbeatRequest.data().topology() != null) {
+ val requiredTopics: Seq[String] =
+
streamsGroupHeartbeatRequest.data().topology().iterator().asScala.flatMap(subtopology
=>
+ (subtopology.sourceTopics().iterator().asScala:Iterator[String])
+ ++
(subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String])
+ ++
(subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String])
+ ++
(subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String])
+ ).toSeq
+
+ // Checking early for valid topology names, since we don't want to
pass those to `authHelper`.
Review Comment:
You are right. Valid & not internal kafka topics. But it is still curious
that part of the validation is happening earlier than all the other validation.
I think the comment is worth it. Fixed it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]