lucasbru commented on code in PR #17391:
URL: https://github.com/apache/kafka/pull/17391#discussion_r1792097050
##########
group-coordinator/src/main/resources/common/message/StreamsGroupTopologyValue.json:
##########
@@ -29,14 +29,25 @@
"about": "String to uniquely identify the subtopology.
Deterministically generated from the topology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
- { "name": "SourceTopicRegex", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The regular expressions identifying topics the topology
reads from. null if not provided." },
+ { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+ "about": "Regular expressions identifying topics the sub-topology
reads from." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions":
"0+",
- "about": "The set of state changelog topics associated with this
subtopology. " },
+ "about": "The set of state changelog topics associated with this
sub-topology." },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions":
"0+",
"about": "The repartition topics the subtopology writes to." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo",
"versions": "0+",
- "about": "The set of source topics that are internally created
repartition topics. " }
+ "about": "The set of source topics that are internally created
repartition topics." },
+ { "name": "CopartitionGroups", "type": "[]CopartitionGroup",
"versions": "0+",
+ "about": "A subset of source topics that must be copartitioned.",
+ "fields": [
+ { "name": "SourceTopics", "type": "[]int32", "versions": "0+",
+ "about": "The topics the topology reads from. Index into the
array on the subtopology level." },
+ { "name": "SourceTopicRegex", "type": "[]int32", "versions": "0+",
+ "about": "Regular expressions identifying topics the subtopology
reads from. Index into the array on the subtopology level." },
+ { "name": "RepartitionSourceTopics", "type": "[]int32",
"versions": "0+",
+ "about": "The set of source topics that are internally created
repartition topics. Index into the array on the subtopology level." }
Review Comment:
Done
##########
clients/src/main/resources/common/message/StreamsGroupInitializeRequest.json:
##########
@@ -32,14 +32,25 @@
"about": "String to uniquely identify the subtopology.
Deterministically generated from the topology." },
{ "name": "SourceTopics", "type": "[]string", "versions": "0+",
"about": "The topics the topology reads from." },
- { "name": "SourceTopicRegex", "type": "string", "versions": "0+",
"nullableVersions": "0+", "default": "null",
- "about": "The regular expressions identifying topics the subtopology
reads from." },
+ { "name": "SourceTopicRegex", "type": "[]string", "versions": "0+",
+ "about": "Regular expressions identifying topics the subtopology
reads from." },
{ "name": "StateChangelogTopics", "type": "[]TopicInfo", "versions":
"0+",
"about": "The set of state changelog topics associated with this
subtopology. Created automatically." },
{ "name": "RepartitionSinkTopics", "type": "[]string", "versions":
"0+",
"about": "The repartition topics the subtopology writes to." },
{ "name": "RepartitionSourceTopics", "type": "[]TopicInfo",
"versions": "0+",
- "about": "The set of source topics that are internally created
repartition topics. Created automatically." }
+ "about": "The set of source topics that are internally created
repartition topics. Created automatically." },
+ { "name": "CopartitionGroups", "type": "[]CopartitionGroup",
"versions": "0+",
+ "about": "A subset of source topics that must be copartitioned.",
+ "fields": [
+ { "name": "SourceTopics", "type": "[]int32", "versions": "0+",
+ "about": "The topics the topology reads from. Index into the
array on the subtopology level." },
+ { "name": "SourceTopicRegex", "type": "[]int32", "versions": "0+",
+ "about": "Regular expressions identifying topics the subtopology
reads from. Index into the array on the subtopology level." },
+ { "name": "RepartitionSourceTopics", "type": "[]int32",
"versions": "0+",
+ "about": "The set of source topics that are internally created
repartition topics. Index into the array on the subtopology level." }
Review Comment:
Done
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsGroupInitializeRequestManager.java:
##########
@@ -95,18 +100,65 @@ private static
StreamsGroupInitializeRequestData.Subtopology getSubtopologyFromS
final StreamsGroupInitializeRequestData.Subtopology subtopologyData =
new StreamsGroupInitializeRequestData.Subtopology();
subtopologyData.setSubtopologyId(subtopologyName);
subtopologyData.setSourceTopics(new
ArrayList<>(subtopology.sourceTopics));
+ Collections.sort(subtopologyData.sourceTopics());
subtopologyData.setRepartitionSinkTopics(new
ArrayList<>(subtopology.sinkTopics));
+ Collections.sort(subtopologyData.repartitionSinkTopics());
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]