cadonna commented on code in PR #17695:
URL: https://github.com/apache/kafka/pull/17695#discussion_r1830947659


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -627,6 +626,99 @@ private List<TaskIds> convertTaskIdCollection(final 
Set<StreamsAssignmentInterfa
                 .collect(Collectors.toList());
         }
 
+        private List<StreamsGroupHeartbeatRequestData.Subtopology> 
getTopologyFromStreams() {
+            final Map<String, StreamsAssignmentInterface.Subtopology> 
subTopologyMap = streamsInterface.subtopologyMap();
+            final List<StreamsGroupHeartbeatRequestData.Subtopology> 
subtopologies = new ArrayList<>(subTopologyMap.size());
+            for (final Map.Entry<String, 
StreamsAssignmentInterface.Subtopology> subtopology : 
subTopologyMap.entrySet()) {
+                
subtopologies.add(getSubtopologyFromStreams(subtopology.getKey(), 
subtopology.getValue()));
+            }
+            return subtopologies;
+        }
+
+        private static StreamsGroupHeartbeatRequestData.Subtopology 
getSubtopologyFromStreams(final String subtopologyName,
+                                                                               
               final StreamsAssignmentInterface.Subtopology subtopology) {
+            final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData 
= new StreamsGroupHeartbeatRequestData.Subtopology();
+            subtopologyData.setSubtopologyId(subtopologyName);
+            ArrayList<String> sortedSourceTopics = new 
ArrayList<>(subtopology.sourceTopics);
+            Collections.sort(sortedSourceTopics);
+            subtopologyData.setSourceTopics(sortedSourceTopics);
+            ArrayList<String> sortedSinkTopics = new 
ArrayList<>(subtopology.repartitionSinkTopics);
+            Collections.sort(sortedSinkTopics);
+            subtopologyData.setRepartitionSinkTopics(sortedSinkTopics);
+            
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
+            
subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology));
+            subtopologyData.setCopartitionGroups(
+                getCopartitionGroupsFromStreams(subtopology.copartitionGroups, 
subtopologyData));
+            return subtopologyData;
+        }
+
+        private static List<CopartitionGroup> getCopartitionGroupsFromStreams(
+            final Collection<Set<String>> copartitionGroups,
+            final StreamsGroupHeartbeatRequestData.Subtopology 
subtopologyData) {

Review Comment:
   nit:
   ```suggestion
           private static List<CopartitionGroup> 
getCopartitionGroupsFromStreams(final Collection<Set<String>> copartitionGroups,
                                                                                
 final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData) {
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -2288,18 +2259,40 @@ private 
CoordinatorResult<StreamsGroupHeartbeatResponseData, CoordinatorRecord>
             records
         );
 
-        if (bumpGroupEpoch || group.hasMetadataExpired(currentTimeMs)) {
-            // The subscription metadata is updated when the refresh deadline 
has been reached.
-            subscriptionMetadata = group.computeSubscriptionMetadata(
+        // 2. Initialize/Update the group topology.

Review Comment:
   If those different steps are so important, why not exporting them to methods 
and give them meaningful names?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -627,6 +626,99 @@ private List<TaskIds> convertTaskIdCollection(final 
Set<StreamsAssignmentInterfa
                 .collect(Collectors.toList());
         }
 
+        private List<StreamsGroupHeartbeatRequestData.Subtopology> 
getTopologyFromStreams() {
+            final Map<String, StreamsAssignmentInterface.Subtopology> 
subTopologyMap = streamsInterface.subtopologyMap();
+            final List<StreamsGroupHeartbeatRequestData.Subtopology> 
subtopologies = new ArrayList<>(subTopologyMap.size());
+            for (final Map.Entry<String, 
StreamsAssignmentInterface.Subtopology> subtopology : 
subTopologyMap.entrySet()) {
+                
subtopologies.add(getSubtopologyFromStreams(subtopology.getKey(), 
subtopology.getValue()));
+            }
+            return subtopologies;
+        }
+
+        private static StreamsGroupHeartbeatRequestData.Subtopology 
getSubtopologyFromStreams(final String subtopologyName,
+                                                                               
               final StreamsAssignmentInterface.Subtopology subtopology) {
+            final StreamsGroupHeartbeatRequestData.Subtopology subtopologyData 
= new StreamsGroupHeartbeatRequestData.Subtopology();
+            subtopologyData.setSubtopologyId(subtopologyName);
+            ArrayList<String> sortedSourceTopics = new 
ArrayList<>(subtopology.sourceTopics);
+            Collections.sort(sortedSourceTopics);
+            subtopologyData.setSourceTopics(sortedSourceTopics);
+            ArrayList<String> sortedSinkTopics = new 
ArrayList<>(subtopology.repartitionSinkTopics);
+            Collections.sort(sortedSinkTopics);
+            subtopologyData.setRepartitionSinkTopics(sortedSinkTopics);
+            
subtopologyData.setRepartitionSourceTopics(getRepartitionTopicsInfoFromStreams(subtopology));
+            
subtopologyData.setStateChangelogTopics(getChangelogTopicsInfoFromStreams(subtopology));
+            subtopologyData.setCopartitionGroups(
+                getCopartitionGroupsFromStreams(subtopology.copartitionGroups, 
subtopologyData));
+            return subtopologyData;
+        }
+
+        private static List<CopartitionGroup> getCopartitionGroupsFromStreams(
+            final Collection<Set<String>> copartitionGroups,
+            final StreamsGroupHeartbeatRequestData.Subtopology 
subtopologyData) {
+
+            final Map<String, Short> sourceTopicsMap =
+                IntStream.range(0, subtopologyData.sourceTopics().size())
+                    .boxed()
+                    
.collect(Collectors.toMap(subtopologyData.sourceTopics()::get, 
Integer::shortValue));
+
+            final Map<String, Short> repartitionSourceTopics =
+                IntStream.range(0, 
subtopologyData.repartitionSourceTopics().size())
+                    .boxed()
+                    .collect(
+                        Collectors.toMap(x -> 
subtopologyData.repartitionSourceTopics().get(x).name(),
+                            Integer::shortValue));
+
+            return copartitionGroups.stream()
+                .map(x -> getCopartitionGroupFromStreams(x, sourceTopicsMap, 
repartitionSourceTopics))
+                .collect(Collectors.toList());
+        }
+
+        private static CopartitionGroup getCopartitionGroupFromStreams(
+            final Set<String> topicNames,
+            final Map<String, Short> sourceTopicsMap,
+            final Map<String, Short> repartitionSourceTopics) {

Review Comment:
   Same nit as above



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManagerTest.java:
##########
@@ -213,6 +242,39 @@ void testFullStaticInformationWhenJoining() {
         assertEquals(1, request.data().clientTags().size());
         assertEquals("clientTag1", request.data().clientTags().get(0).key());
         assertEquals("value2", request.data().clientTags().get(0).value());
+        assertEquals(streamsAssignmentInterface.topologyId(), 
request.data().topologyId());
+        assertNotNull(request.data().topology());
+        final List<StreamsGroupHeartbeatRequestData.Subtopology> subtopologies 
= request.data().topology();
+        assertEquals(1, subtopologies.size());
+        final StreamsGroupHeartbeatRequestData.Subtopology subtopology = 
subtopologies.get(0);
+        assertEquals(subtopologyName1, subtopology.subtopologyId());
+        assertEquals(Arrays.asList("sourceTopic1", "sourceTopic2"), 
subtopology.sourceTopics());
+        assertEquals(Arrays.asList("repartitionSinkTopic1", 
"repartitionSinkTopic2", "repartitionSinkTopic3"), 
subtopology.repartitionSinkTopics());
+        assertEquals(repartitionSourceTopics.size(), 
subtopology.repartitionSourceTopics().size());
+        subtopology.repartitionSourceTopics().forEach(topicInfo -> {
+            final StreamsAssignmentInterface.TopicInfo repartitionTopic = 
repartitionSourceTopics.get(topicInfo.name());
+            assertEquals(repartitionTopic.numPartitions.get(), 
topicInfo.partitions());
+            assertEquals(repartitionTopic.replicationFactor.get(), 
topicInfo.replicationFactor());
+        });
+        assertEquals(changelogTopics.size(), 
subtopology.stateChangelogTopics().size());
+        subtopology.stateChangelogTopics().forEach(topicInfo -> {
+            assertTrue(changelogTopics.containsKey(topicInfo.name()));
+            assertEquals(0, topicInfo.partitions());
+            final StreamsAssignmentInterface.TopicInfo changelogTopic = 
changelogTopics.get(topicInfo.name());
+            assertEquals(changelogTopic.replicationFactor.get(), 
topicInfo.replicationFactor());
+        });
+

Review Comment:
   nit:
   ```suggestion
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupHeartbeatRequestManager.java:
##########
@@ -627,6 +626,99 @@ private List<TaskIds> convertTaskIdCollection(final 
Set<StreamsAssignmentInterfa
                 .collect(Collectors.toList());
         }
 
+        private List<StreamsGroupHeartbeatRequestData.Subtopology> 
getTopologyFromStreams() {
+            final Map<String, StreamsAssignmentInterface.Subtopology> 
subTopologyMap = streamsInterface.subtopologyMap();
+            final List<StreamsGroupHeartbeatRequestData.Subtopology> 
subtopologies = new ArrayList<>(subTopologyMap.size());
+            for (final Map.Entry<String, 
StreamsAssignmentInterface.Subtopology> subtopology : 
subTopologyMap.entrySet()) {
+                
subtopologies.add(getSubtopologyFromStreams(subtopology.getKey(), 
subtopology.getValue()));

Review Comment:
   Why do you not also sort this list?
   As far as I can see the consumer coordinator iterates over this list and 
writes the content into the coordinator record.
   
   In general, why do we not sort all lists in the consumer-coordinator. We do 
not specify in the KIP that the lists need to be sorted as far as I remember. 
Thus, the sorting is rather an implementation detail.  



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3879,103 +3878,83 @@ class KafkaApis(val requestChannel: RequestChannel,
     config.groupCoordinatorRebalanceProtocols.contains(Group.GroupType.STREAMS)
   }
 
-  def handleStreamsGroupInitialize(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    // TODO: The unit tests for this method are insufficient. Once we merge 
initialize with group heartbeat, we have to extend the tests to cover ACLs and 
internal topic creation
-    val streamsGroupInitializeRequest = 
request.body[StreamsGroupInitializeRequest]
+  def handleStreamsGroupHeartbeat(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+    val streamsGroupHeartbeatRequest = 
request.body[StreamsGroupHeartbeatRequest]
 
     if (!isStreamsGroupProtocolEnabled()) {
       // The API is not supported by the "old" group coordinator (the 
default). If the
       // new one is not enabled, we fail directly here.
-      requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      requestHelper.sendMaybeThrottle(request, 
streamsGroupHeartbeatRequest.getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
       CompletableFuture.completedFuture[Unit](())
-    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsGroupInitializeRequest.data.groupId)) {
-      requestHelper.sendMaybeThrottle(request, 
streamsGroupInitializeRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+    } else if (!authHelper.authorize(request.context, READ, GROUP, 
streamsGroupHeartbeatRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
streamsGroupHeartbeatRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
       CompletableFuture.completedFuture[Unit](())
     } else {
       val requestContext = request.context
 
-      val internalTopics: Map[String, 
StreamsGroupInitializeRequestData.TopicInfo] = {
-        
streamsGroupInitializeRequest.data().topology().asScala.flatMap(subtopology =>
-          subtopology.repartitionSourceTopics().iterator().asScala ++ 
subtopology.stateChangelogTopics().iterator().asScala
-        ).map(x => x.name() -> x).toMap
-      }
-
-      val prohibitedInternalTopics = 
internalTopics.keys.filter(Topic.isInternal)
-      if (prohibitedInternalTopics.nonEmpty) {
-        val errorResponse = new StreamsGroupInitializeResponseData()
-        errorResponse.setErrorCode(Errors.STREAMS_INVALID_TOPOLOGY.code)
-        errorResponse.setErrorMessage(f"Use of Kafka internal topics 
${prohibitedInternalTopics.mkString(",")} as Kafka Streams internal topics is 
prohibited.")
-        requestHelper.sendMaybeThrottle(request, new 
StreamsGroupInitializeResponse(errorResponse))
-        return CompletableFuture.completedFuture[Unit](())
-      }
+      if (streamsGroupHeartbeatRequest.data().topology() != null) {
+        val requiredTopics: Seq[String] =
+          
streamsGroupHeartbeatRequest.data().topology().iterator().asScala.flatMap(subtopology
 =>
+            (subtopology.sourceTopics().iterator().asScala:Iterator[String])
+              ++ 
(subtopology.repartitionSinkTopics().iterator().asScala:Iterator[String])
+              ++ 
(subtopology.repartitionSourceTopics().iterator().asScala.map(_.name()):Iterator[String])
+              ++ 
(subtopology.stateChangelogTopics().iterator().asScala.map(_.name()):Iterator[String])
+          ).toSeq
+
+        // Checking early for valid topology names, since we don't want to 
pass those to `authHelper`.

Review Comment:
   I do not understand this comment. Should it be "valid topic names"? But 
then, why "valid"? Here you check whether internal topics are used. On line 
3914 you check for validity.
   IMO, the comment is not needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to