lucasbru commented on code in PR #17391:
URL: https://github.com/apache/kafka/pull/17391#discussion_r1792097050


##########
group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json:
##########
@@ -29,14 +29,25 @@
           "about": "String to uniquely identify the subtopology. 
Deterministically generated from the topology." },
         { "name": "SourceTopics", "type": "[]string", "versions": "0+",
           "about": "The topics the topology reads from." },
-        { "name": "SourceTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-          "about": "The regular expressions identifying topics the topology 
reads from. null if not provided." },
+        { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+          "about": "Regular expressions identifying topics the sub-topology 
reads from." },
         { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": 
"0+",
-          "about": "The set of state changelog topics associated with this 
subtopology. " },
+          "about": "The set of state changelog topics associated with this 
sub-topology." },
         { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
           "about": "The repartition topics the subtopology writes to." },
         { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
-          "about": "The set of source topics that are internally created 
repartition topics. " }
+          "about": "The set of source topics that are internally created 
repartition topics." },
+        { "name": "CopartitionGroups", "type": "[]CopartitionGroup", 
"versions": "0+",
+          "about": "A subset of source topics that must be copartitioned.",
+          "fields": [
+            { "name": "SourceTopics", "type": "[]int32", "versions": "0+",
+              "about": "The topics the topology reads from. Index into the 
array on the subtopology level." },
+            { "name": "SourceTopicRegex", "type": "[]int32", "versions": "0+",
+              "about": "Regular expressions identifying topics the subtopology 
reads from. Index into the array on the subtopology level." },
+            { "name": "RepartitionSourceTopics", "type": "[]int32", 
"versions": "0+",
+              "about": "The set of source topics that are internally created 
repartition topics. Index into the array on the subtopology level." }

Review Comment:
   Done



##########
clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json:
##########
@@ -32,14 +32,25 @@
           "about": "String to uniquely identify the subtopology. 
Deterministically generated from the topology." },
         { "name": "SourceTopics", "type": "[]string", "versions": "0+",
           "about": "The topics the topology reads from." },
-        { "name": "SourceTopicRegex", "type": "string", "versions": "0+", 
"nullableVersions": "0+", "default": "null",
-          "about": "The regular expressions identifying topics the subtopology 
reads from." },
+        { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+          "about": "Regular expressions identifying topics the subtopology 
reads from." },
         { "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions": 
"0+",
           "about": "The set of state changelog topics associated with this 
subtopology. Created automatically." },
         { "name": "RepartitionSinkTopics", "type": "[]string", "versions": 
"0+",
           "about": "The repartition topics the subtopology writes to." },
         { "name": "RepartitionSourceTopics", "type": "[]TopicInfo", 
"versions": "0+",
-          "about": "The set of source topics that are internally created 
repartition topics. Created automatically." }
+          "about": "The set of source topics that are internally created 
repartition topics. Created automatically." },
+        { "name": "CopartitionGroups", "type": "[]CopartitionGroup", 
"versions": "0+",
+          "about": "A subset of source topics that must be copartitioned.",
+          "fields": [
+            { "name": "SourceTopics", "type": "[]int32", "versions": "0+",
+              "about": "The topics the topology reads from. Index into the 
array on the subtopology level." },
+            { "name": "SourceTopicRegex", "type": "[]int32", "versions": "0+",
+              "about": "Regular expressions identifying topics the subtopology 
reads from. Index into the array on the subtopology level." },
+            { "name": "RepartitionSourceTopics", "type": "[]int32", 
"versions": "0+",
+              "about": "The set of source topics that are internally created 
repartition topics. Index into the array on the subtopology level." }

Review Comment:
   Done



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java:
##########
@@ -95,18 +100,65 @@ private static 
StreamsGroupInitializeRequestData.Subtopology getSubtopologyFromS
         final StreamsGroupInitializeRequestData.Subtopology subtopologyData = 
new StreamsGroupInitializeRequestData.Subtopology();
         subtopologyData.setSubtopologyId(subtopologyName);
         subtopologyData.setSourceTopics(new 
ArrayList<>(subtopology.sourceTopics));
+        Collections.sort(subtopologyData.sourceTopics());
         subtopologyData.setRepartitionSinkTopics(new 
ArrayList<>(subtopology.sinkTopics));
+        Collections.sort(subtopologyData.repartitionSinkTopics());

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to