lucasbru commented on code in PR #17695:
URL: https://github.com/apache/kafka/pull/17695#discussion_r1831143285


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3879,103 +3878,83 @@ class KafkaApis(val requestChannel: RequestChannel,
     config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
   }
 
-  def handleStreamsGroupInitialize(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    // TODO: The unit tests for this method are insufficient. Once we merge 
initialize with group heartbeat, we have to extend the tests to cover ACLs and 
internal topic creation
-    val streamsGroupInitializeRequest = 
request.body[StreamsGroupInitializeRequest]
+  def handleStreamsGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val streamsGroupHeartbeatRequest = 
request.body[StreamsGroupHeartbeatRequest]
 
     if (!isStreamsGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
-      requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      requestHelper.sendMaybeThrottle(request, 
streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
-    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsGroupInitializeRequest.data.groupId)) {
-      requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsGroupHeartbeatRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
       val requestContext = request.context
 
-      val internalTopics: Map[String, 
StreamsGroupInitializeRequestData.TopicInfo] = {
-        
streamsGroupInitializeRequest.data().topology().asScala.flatMap(subtopology =>
-          subtopology.repartitionSourceTopics().iterator().asScala ++ 
subtopology.stateChangelogTopics().iterator().asScala
-        ).map(x => x.name() -> x).toMap
-      }
-
-      val prohibitedInternalTopics = 
internalTopics.keys.filter(Topic.isInternal)
-      if (prohibitedInternalTopics.nonEmpty) {
-        val errorResponse = new StreamsGroupInitializeResponseData()
-        errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
-        errorResponse.setErrorMessage(f"Use of Kafka internal topics 
${prohibitedInternalTopics.mkString(",")} as Kafka Streams internal topics is 
prohibited.")
-        requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(errorResponse))
-        return CompletableFuture.completedFuture[Unit](())
-      }
+      if (streamsGroupHeartbeatRequest.data().topology() != null) {
+        val requiredTopics: Seq[String] =
+          
streamsGroupHeartbeatRequest.data().topology().iterator().asScala.flatMap(subtopology
 =>
+            (subtopology.sourceTopics().iterator().asScala:Iterator[String])
+              ++ 
(subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String])
+              ++ 
(subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String])
+              ++ 
(subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String])
+          ).toSeq
+
+        // Checking early for valid topology names, since we don't want to 
pass those to `authHelper`.

Review Comment:
   You are right. Valid & not internal kafka topics. But it is still curious 
that part of the validation is happening earlier than all the other validation. 
I think the comment is worth it. Fixed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to