cadonna commented on code in PR #18395:
URL: https://github.com/apache/kafka/pull/18395#discussion_r1907228239


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for configuring the number of partitions in repartitioning 
topics. It computes a fix-point iteration, deriving the number of
+ * partitions for each repartition topic based on the number of partitions of 
the source topics of the topology, if the number of
+ * partitions is not explicitly set in the topology.
+ */
+public class RepartitionTopics {
+
+    private final Logger log;
+    private final Collection<Subtopology> subtopologies;
+    private final Function<String, OptionalInt> topicPartitionCountProvider;
+
+    /**
+     * The constructor for the class.
+     *
+     * @param logContext                   The context for emitting log 
messages.
+     * @param subtopologies                The subtopologies for the requested 
topology.
+     * @param topicPartitionCountProvider  Returns the number of partitions 
for a given topic, representing the current state of the
+     *                                     broker.
+     */
+    public RepartitionTopics(final LogContext logContext,
+                             final Collection<Subtopology> subtopologies,
+                             final Function<String, OptionalInt> 
topicPartitionCountProvider) {
+        this.log = logContext.logger(getClass());
+        this.subtopologies = subtopologies;
+        this.topicPartitionCountProvider = topicPartitionCountProvider;
+    }
+
+    /**
+     * Returns the set the number of partitions for each repartition topic.

Review Comment:
   ```suggestion
        * Returns the set of the number of partitions for each repartition 
topic.
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME1 = "source1";
+    private static final String SOURCE_TOPIC_NAME2 = "source2";
+    private static final String SINK_TOPIC_NAME1 = "sink1";
+    private static final String SINK_TOPIC_NAME2 = "sink2";
+    private static final String REPARTITION_TOPIC_NAME1 = "repartition1";
+    private static final String REPARTITION_TOPIC_NAME2 = "repartition2";
+    private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = 
"repartitionWithoutPartitionCount";
+    private static final TopicConfig TOPIC_CONFIG1 = new 
TopicConfig().setKey("config1").setValue("val1");
+    private static final TopicConfig TOPIC_CONFIG2 = new 
TopicConfig().setKey("config2").setValue("val2");
+    private static final TopicConfig TOPIC_CONFIG5 = new 
TopicConfig().setKey("config5").setValue("val5");
+    private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME1)
+        .setPartitions(4)
+        .setTopicConfigs(List.of(TOPIC_CONFIG1));
+    private static final Subtopology SUBTOPOLOGY2 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY2")
+        .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME2)
+        .setPartitions(2)
+        .setTopicConfigs(List.of(TOPIC_CONFIG2));
+    private static final Subtopology SUBTOPOLOGY1 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY1")
+        .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+        
.setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO2
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo 
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo()
+        .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)
+        .setTopicConfigs(List.of(TOPIC_CONFIG5));
+    private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new 
Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT")
+        .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final Set<String> TOPICS = Set.of(
+        SOURCE_TOPIC_NAME1,
+        SOURCE_TOPIC_NAME2,
+        SINK_TOPIC_NAME1,
+        SINK_TOPIC_NAME2,
+        REPARTITION_TOPIC_NAME1,
+        REPARTITION_TOPIC_NAME2
+    );
+
+    @Test
+    public void shouldSetupRepartitionTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
+                mkEntry(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_INFO2.partitions())
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider =
+            s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() :
+                (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty());
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        final TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class,
+            repartitionTopics::setup);
+
+        assertNotNull(exception);

Review Comment:
   Isn't this implied by `assertThrows()`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java:
##########
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Responsible for configuring the number of partitions in repartitioning 
topics. It computes a fix-point iteration, deriving the number of
+ * partitions for each repartition topic based on the number of partitions of 
the source topics of the topology, if the number of
+ * partitions is not explicitly set in the topology.
+ */
+public class RepartitionTopics {
+
+    private final Logger log;
+    private final Collection<Subtopology> subtopologies;
+    private final Function<String, OptionalInt> topicPartitionCountProvider;
+
+    /**
+     * The constructor for the class.
+     *
+     * @param logContext                   The context for emitting log 
messages.
+     * @param subtopologies                The subtopologies for the requested 
topology.
+     * @param topicPartitionCountProvider  Returns the number of partitions 
for a given topic, representing the current state of the
+     *                                     broker.
+     */
+    public RepartitionTopics(final LogContext logContext,
+                             final Collection<Subtopology> subtopologies,
+                             final Function<String, OptionalInt> 
topicPartitionCountProvider) {
+        this.log = logContext.logger(getClass());
+        this.subtopologies = subtopologies;
+        this.topicPartitionCountProvider = topicPartitionCountProvider;
+    }
+
+    /**
+     * Returns the set the number of partitions for each repartition topic.
+     *
+     * @return the map of repartition topics for the requested topology to 
their required number of partitions.
+     */
+    public Map<String, Integer> setup() {
+        final Set<String> missingSourceTopicsForTopology = new HashSet<>();
+
+        for (final Subtopology subtopology : subtopologies) {
+            final Set<String> missingSourceTopicsForSubtopology = 
computeMissingExternalSourceTopics(subtopology);
+            
missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology);
+        }
+
+        if (missingSourceTopicsForTopology.isEmpty()) {
+            return computeRepartitionSourceTopicPartitionCount();
+        } else {
+            throw 
TopicConfigurationException.missingSourceTopics(String.format("Missing source 
topics: %s",
+                String.join(", ", missingSourceTopicsForTopology)));
+        }
+    }
+
+    private Set<String> computeMissingExternalSourceTopics(final Subtopology 
subtopology) {
+        final Set<String> missingExternalSourceTopics = new 
HashSet<>(subtopology.sourceTopics());
+        for (final TopicInfo topicInfo : 
subtopology.repartitionSourceTopics()) {
+            missingExternalSourceTopics.remove(topicInfo.name());
+        }
+        missingExternalSourceTopics.removeIf(x -> 
topicPartitionCountProvider.apply(x).isPresent());
+        return missingExternalSourceTopics;
+    }
+
+    /**
+     * Computes the number of partitions and returns it for each repartition 
topic.
+     */
+    private Map<String, Integer> computeRepartitionSourceTopicPartitionCount() 
{
+        boolean partitionCountNeeded;
+        Map<String, Integer> repartitionSourceTopicPartitionCounts = new 
HashMap<>();
+        Map<String, Set<String>> repartitionSinkTopics =
+            subtopologies.stream().collect(
+                Collectors.toMap(
+                    Subtopology::subtopologyId,
+                    x -> new HashSet<>(x.repartitionSinkTopics())
+                )
+            );
+
+        for (final Subtopology subtopology : subtopologies) {
+            for (final TopicInfo repartitionSourceTopic : 
subtopology.repartitionSourceTopics()) {
+                if (repartitionSourceTopic.partitions() != 0) {
+                    
repartitionSourceTopicPartitionCounts.put(repartitionSourceTopic.name(), 
repartitionSourceTopic.partitions());
+                }
+            }
+        }
+
+        do {
+            partitionCountNeeded = false;
+            // avoid infinitely looping without making any progress on unknown 
repartitions
+            boolean progressMadeThisIteration = false;
+
+            for (final Subtopology subtopology : subtopologies) {
+                for (final TopicInfo repartitionSourceTopic : 
subtopology.repartitionSourceTopics()) {
+                    final Integer repartitionSourceTopicPartitionCount =
+                        
repartitionSourceTopicPartitionCounts.get(repartitionSourceTopic.name());
+
+                    if (repartitionSourceTopicPartitionCount == null) {
+                        final Integer numPartitions = computePartitionCount(
+                            repartitionSourceTopicPartitionCounts,
+                            repartitionSourceTopic.name(),
+                            repartitionSinkTopics
+                        );
+
+                        if (numPartitions == null) {
+                            partitionCountNeeded = true;
+                            log.trace("Unable to determine number of 
partitions for {}, another iteration is needed",
+                                repartitionSourceTopic);
+                        } else {
+                            log.trace("Determined number of partitions for {} 
to be {}",
+                                repartitionSourceTopic,
+                                numPartitions);
+                            
repartitionSourceTopicPartitionCounts.put(repartitionSourceTopic.name(), 
numPartitions);
+                            progressMadeThisIteration = true;
+                        }
+                    }
+                }
+            }
+            if (!progressMadeThisIteration && partitionCountNeeded) {
+                throw TopicConfigurationException.missingSourceTopics("Failed 
to compute number of partitions for all " +
+                    "repartition topics, make sure all user input topics are 
created and all pattern subscriptions " +
+                    "match at least one topic in the cluster");
+            }
+        } while (partitionCountNeeded);
+
+        return repartitionSourceTopicPartitionCounts;
+    }
+
+    private Integer computePartitionCount(final Map<String, Integer> 
repartitionSourceTopicPartitionCounts,
+                                          final String repartitionSourceTopic,
+                                          Map<String, Set<String>> 
repartitionSinkTopics) {

Review Comment:
   ```suggestion
                                             final Map<String, Set<String>> 
repartitionSinkTopics) {
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME1 = "source1";
+    private static final String SOURCE_TOPIC_NAME2 = "source2";
+    private static final String SINK_TOPIC_NAME1 = "sink1";
+    private static final String SINK_TOPIC_NAME2 = "sink2";
+    private static final String REPARTITION_TOPIC_NAME1 = "repartition1";
+    private static final String REPARTITION_TOPIC_NAME2 = "repartition2";
+    private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = 
"repartitionWithoutPartitionCount";
+    private static final TopicConfig TOPIC_CONFIG1 = new 
TopicConfig().setKey("config1").setValue("val1");
+    private static final TopicConfig TOPIC_CONFIG2 = new 
TopicConfig().setKey("config2").setValue("val2");
+    private static final TopicConfig TOPIC_CONFIG5 = new 
TopicConfig().setKey("config5").setValue("val5");
+    private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME1)
+        .setPartitions(4)
+        .setTopicConfigs(List.of(TOPIC_CONFIG1));
+    private static final Subtopology SUBTOPOLOGY2 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY2")
+        .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME2)
+        .setPartitions(2)
+        .setTopicConfigs(List.of(TOPIC_CONFIG2));
+    private static final Subtopology SUBTOPOLOGY1 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY1")
+        .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+        
.setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO2
+        ))

Review Comment:
   Should those two repartition topics not also be part of the source topics on 
line 70?
   It does not change the test but I was confused.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME1 = "source1";
+    private static final String SOURCE_TOPIC_NAME2 = "source2";
+    private static final String SINK_TOPIC_NAME1 = "sink1";
+    private static final String SINK_TOPIC_NAME2 = "sink2";
+    private static final String REPARTITION_TOPIC_NAME1 = "repartition1";
+    private static final String REPARTITION_TOPIC_NAME2 = "repartition2";
+    private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = 
"repartitionWithoutPartitionCount";
+    private static final TopicConfig TOPIC_CONFIG1 = new 
TopicConfig().setKey("config1").setValue("val1");
+    private static final TopicConfig TOPIC_CONFIG2 = new 
TopicConfig().setKey("config2").setValue("val2");
+    private static final TopicConfig TOPIC_CONFIG5 = new 
TopicConfig().setKey("config5").setValue("val5");
+    private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME1)
+        .setPartitions(4)
+        .setTopicConfigs(List.of(TOPIC_CONFIG1));
+    private static final Subtopology SUBTOPOLOGY2 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY2")
+        .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME2)
+        .setPartitions(2)
+        .setTopicConfigs(List.of(TOPIC_CONFIG2));
+    private static final Subtopology SUBTOPOLOGY1 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY1")
+        .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+        
.setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO2
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo 
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo()
+        .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)
+        .setTopicConfigs(List.of(TOPIC_CONFIG5));
+    private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new 
Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT")
+        .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final Set<String> TOPICS = Set.of(
+        SOURCE_TOPIC_NAME1,
+        SOURCE_TOPIC_NAME2,
+        SINK_TOPIC_NAME1,
+        SINK_TOPIC_NAME2,
+        REPARTITION_TOPIC_NAME1,
+        REPARTITION_TOPIC_NAME2
+    );
+
+    @Test
+    public void shouldSetupRepartitionTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
+                mkEntry(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_INFO2.partitions())
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider =
+            s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() :
+                (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty());
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        final TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class,
+            repartitionTopics::setup);
+
+        assertNotNull(exception);
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals("Missing source topics: source1", exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
 {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY_WITHOUT_PARTITION_COUNT);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class, repartitionTopics::setup);
+
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals(
+            "Failed to compute number of partitions for all repartition 
topics, make sure all user input topics are created "
+                + "and all pattern subscriptions match at least one topic in 
the cluster",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("SUBTOPOLOGY0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, 
REPARTITION_TOPIC_NAME2))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+            .setRepartitionSourceTopics(List.of(
+                REPARTITION_TOPIC_INFO1,
+                REPARTITION_TOPIC_INFO2,
+                REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT

Review Comment:
   ```suggestion
                   REPARTITION_TOPIC_INFO2
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME1 = "source1";
+    private static final String SOURCE_TOPIC_NAME2 = "source2";
+    private static final String SINK_TOPIC_NAME1 = "sink1";
+    private static final String SINK_TOPIC_NAME2 = "sink2";
+    private static final String REPARTITION_TOPIC_NAME1 = "repartition1";
+    private static final String REPARTITION_TOPIC_NAME2 = "repartition2";
+    private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = 
"repartitionWithoutPartitionCount";
+    private static final TopicConfig TOPIC_CONFIG1 = new 
TopicConfig().setKey("config1").setValue("val1");
+    private static final TopicConfig TOPIC_CONFIG2 = new 
TopicConfig().setKey("config2").setValue("val2");
+    private static final TopicConfig TOPIC_CONFIG5 = new 
TopicConfig().setKey("config5").setValue("val5");
+    private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME1)
+        .setPartitions(4)
+        .setTopicConfigs(List.of(TOPIC_CONFIG1));
+    private static final Subtopology SUBTOPOLOGY2 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY2")
+        .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME2)
+        .setPartitions(2)
+        .setTopicConfigs(List.of(TOPIC_CONFIG2));
+    private static final Subtopology SUBTOPOLOGY1 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY1")
+        .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+        
.setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO2
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo 
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo()
+        .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)
+        .setTopicConfigs(List.of(TOPIC_CONFIG5));
+    private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new 
Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT")
+        .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final Set<String> TOPICS = Set.of(
+        SOURCE_TOPIC_NAME1,
+        SOURCE_TOPIC_NAME2,
+        SINK_TOPIC_NAME1,
+        SINK_TOPIC_NAME2,
+        REPARTITION_TOPIC_NAME1,
+        REPARTITION_TOPIC_NAME2
+    );
+
+    @Test
+    public void shouldSetupRepartitionTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
+                mkEntry(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_INFO2.partitions())
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider =
+            s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() :
+                (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty());
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        final TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class,
+            repartitionTopics::setup);
+
+        assertNotNull(exception);
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals("Missing source topics: source1", exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
 {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY_WITHOUT_PARTITION_COUNT);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class, repartitionTopics::setup);
+
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals(
+            "Failed to compute number of partitions for all repartition 
topics, make sure all user input topics are created "
+                + "and all pattern subscriptions match at least one topic in 
the cluster",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("SUBTOPOLOGY0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, 
REPARTITION_TOPIC_NAME2))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+            .setRepartitionSourceTopics(List.of(
+                REPARTITION_TOPIC_INFO1,
+                REPARTITION_TOPIC_INFO2,
+                REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+            ))
+            .setStateChangelogTopics(Collections.emptyList());
+        List<Subtopology> subtopologyToSubtopology = List.of(
+            subtopology,
+            SUBTOPOLOGY_WITHOUT_PARTITION_COUNT
+        );
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(mkMap(
+            mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
+            mkEntry(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_INFO2.partitions()),
+            mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, 3)
+        ), setup);
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic()
 {
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("SUBTOPOLOGY0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, 
REPARTITION_TOPIC_NAME1))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+            .setRepartitionSourceTopics(List.of(
+                REPARTITION_TOPIC_INFO1,
+                REPARTITION_TOPIC_INFO2,
+                REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+            ))
+            .setStateChangelogTopics(Collections.emptyList());
+        List<Subtopology> subtopologyToSubtopology = List.of(subtopology, 
SUBTOPOLOGY_WITHOUT_PARTITION_COUNT);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
+                mkEntry(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_INFO2.partitions()),
+                mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, 4)
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamSourceTopicMultipleSubtopologies()
 {

Review Comment:
   How is this different from 
`shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic()`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME1 = "source1";
+    private static final String SOURCE_TOPIC_NAME2 = "source2";
+    private static final String SINK_TOPIC_NAME1 = "sink1";
+    private static final String SINK_TOPIC_NAME2 = "sink2";
+    private static final String REPARTITION_TOPIC_NAME1 = "repartition1";
+    private static final String REPARTITION_TOPIC_NAME2 = "repartition2";
+    private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = 
"repartitionWithoutPartitionCount";
+    private static final TopicConfig TOPIC_CONFIG1 = new 
TopicConfig().setKey("config1").setValue("val1");
+    private static final TopicConfig TOPIC_CONFIG2 = new 
TopicConfig().setKey("config2").setValue("val2");
+    private static final TopicConfig TOPIC_CONFIG5 = new 
TopicConfig().setKey("config5").setValue("val5");
+    private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME1)
+        .setPartitions(4)
+        .setTopicConfigs(List.of(TOPIC_CONFIG1));
+    private static final Subtopology SUBTOPOLOGY2 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY2")
+        .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME2)
+        .setPartitions(2)
+        .setTopicConfigs(List.of(TOPIC_CONFIG2));
+    private static final Subtopology SUBTOPOLOGY1 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY1")
+        .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+        
.setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO2
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo 
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo()
+        .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)
+        .setTopicConfigs(List.of(TOPIC_CONFIG5));
+    private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new 
Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT")
+        .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final Set<String> TOPICS = Set.of(
+        SOURCE_TOPIC_NAME1,
+        SOURCE_TOPIC_NAME2,
+        SINK_TOPIC_NAME1,
+        SINK_TOPIC_NAME2,
+        REPARTITION_TOPIC_NAME1,
+        REPARTITION_TOPIC_NAME2
+    );
+
+    @Test
+    public void shouldSetupRepartitionTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
+                mkEntry(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_INFO2.partitions())
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider =
+            s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() :
+                (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty());
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        final TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class,
+            repartitionTopics::setup);
+
+        assertNotNull(exception);
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals("Missing source topics: source1", exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
 {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY_WITHOUT_PARTITION_COUNT);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class, repartitionTopics::setup);
+
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals(
+            "Failed to compute number of partitions for all repartition 
topics, make sure all user input topics are created "
+                + "and all pattern subscriptions match at least one topic in 
the cluster",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("SUBTOPOLOGY0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, 
REPARTITION_TOPIC_NAME2))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+            .setRepartitionSourceTopics(List.of(
+                REPARTITION_TOPIC_INFO1,
+                REPARTITION_TOPIC_INFO2,
+                REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+            ))
+            .setStateChangelogTopics(Collections.emptyList());
+        List<Subtopology> subtopologyToSubtopology = List.of(
+            subtopology,
+            SUBTOPOLOGY_WITHOUT_PARTITION_COUNT
+        );
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();

Review Comment:
   ```suggestion
           int expectedPartitionCount = 3
           Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(expectedPartitionCount) : 
OptionalInt.empty();
   ```
   and use `expectedPartitionCount` in the verification on line 187.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java:
##########
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+import org.apache.kafka.common.utils.LogContext;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig;
+import 
org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class RepartitionTopicsTest {
+
+    private static final LogContext LOG_CONTEXT = new LogContext();
+    private static final String SOURCE_TOPIC_NAME1 = "source1";
+    private static final String SOURCE_TOPIC_NAME2 = "source2";
+    private static final String SINK_TOPIC_NAME1 = "sink1";
+    private static final String SINK_TOPIC_NAME2 = "sink2";
+    private static final String REPARTITION_TOPIC_NAME1 = "repartition1";
+    private static final String REPARTITION_TOPIC_NAME2 = "repartition2";
+    private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = 
"repartitionWithoutPartitionCount";
+    private static final TopicConfig TOPIC_CONFIG1 = new 
TopicConfig().setKey("config1").setValue("val1");
+    private static final TopicConfig TOPIC_CONFIG2 = new 
TopicConfig().setKey("config2").setValue("val2");
+    private static final TopicConfig TOPIC_CONFIG5 = new 
TopicConfig().setKey("config5").setValue("val5");
+    private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME1)
+        .setPartitions(4)
+        .setTopicConfigs(List.of(TOPIC_CONFIG1));
+    private static final Subtopology SUBTOPOLOGY2 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY2")
+        .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo()
+        .setName(REPARTITION_TOPIC_NAME2)
+        .setPartitions(2)
+        .setTopicConfigs(List.of(TOPIC_CONFIG2));
+    private static final Subtopology SUBTOPOLOGY1 = new Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY1")
+        .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2))
+        
.setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO2
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final TopicInfo 
REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo()
+        .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)
+        .setTopicConfigs(List.of(TOPIC_CONFIG5));
+    private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new 
Subtopology()
+        .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT")
+        .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+        .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2))
+        .setRepartitionSourceTopics(List.of(
+            REPARTITION_TOPIC_INFO1,
+            REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+        ))
+        .setStateChangelogTopics(Collections.emptyList());
+    private static final Set<String> TOPICS = Set.of(
+        SOURCE_TOPIC_NAME1,
+        SOURCE_TOPIC_NAME2,
+        SINK_TOPIC_NAME1,
+        SINK_TOPIC_NAME2,
+        REPARTITION_TOPIC_NAME1,
+        REPARTITION_TOPIC_NAME2
+    );
+
+    @Test
+    public void shouldSetupRepartitionTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(
+            mkMap(
+                mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
+                mkEntry(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_INFO2.partitions())
+            ),
+            setup
+        );
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY2);
+        Function<String, OptionalInt> topicPartitionCountProvider =
+            s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() :
+                (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty());
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        final TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class,
+            repartitionTopics::setup);
+
+        assertNotNull(exception);
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals("Missing source topics: source1", exception.getMessage());
+    }
+
+    @Test
+    public void 
shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
 {
+        List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, 
SUBTOPOLOGY_WITHOUT_PARTITION_COUNT);
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        TopicConfigurationException exception = 
assertThrows(TopicConfigurationException.class, repartitionTopics::setup);
+
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals(
+            "Failed to compute number of partitions for all repartition 
topics, make sure all user input topics are created "
+                + "and all pattern subscriptions match at least one topic in 
the cluster",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() {
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("SUBTOPOLOGY0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, 
REPARTITION_TOPIC_NAME2))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+            .setRepartitionSourceTopics(List.of(
+                REPARTITION_TOPIC_INFO1,
+                REPARTITION_TOPIC_INFO2,
+                REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT
+            ))
+            .setStateChangelogTopics(Collections.emptyList());
+        List<Subtopology> subtopologyToSubtopology = List.of(
+            subtopology,
+            SUBTOPOLOGY_WITHOUT_PARTITION_COUNT
+        );
+        Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty();
+        final RepartitionTopics repartitionTopics = new RepartitionTopics(
+            LOG_CONTEXT,
+            subtopologyToSubtopology,
+            topicPartitionCountProvider
+        );
+
+        Map<String, Integer> setup = repartitionTopics.setup();
+
+        assertEquals(mkMap(
+            mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
+            mkEntry(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_INFO2.partitions()),
+            mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, 3)
+        ), setup);
+    }
+
+    @Test
+    public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic()
 {
+        final Subtopology subtopology = new Subtopology()
+            .setSubtopologyId("SUBTOPOLOGY0")
+            .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, 
REPARTITION_TOPIC_NAME1))
+            .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME2, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
+            .setRepartitionSourceTopics(List.of(
+                REPARTITION_TOPIC_INFO1,
+                REPARTITION_TOPIC_INFO2,
+                REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT

Review Comment:
   This test is a bit confusing. 
   `REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT` and 
`REPARTITION_TOPIC_INFO2` in `RepartitionSourceTopics` have no reason, haven't 
they?
   Additionally, `REPARTITION_TOPIC_INFO1` makes the two subtopology be a 
cycle. 
   The test is correct also this way, but it makes the test hard to read and 
understand.
   I propose to something like this:
   ```java
       @Test
       public void 
shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic()
 {
           final Subtopology subtopology = new Subtopology()
               .setSubtopologyId("SUBTOPOLOGY0")
               .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, 
REPARTITION_TOPIC_NAME3))
               .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT))
               .setRepartitionSourceTopics(List.of(
                   REPARTITION_TOPIC_INFO3
               ))
               .setStateChangelogTopics(Collections.emptyList());
           List<Subtopology> subtopologyToSubtopology = List.of(subtopology, 
SUBTOPOLOGY_WITHOUT_PARTITION_COUNT);
           Function<String, OptionalInt> topicPartitionCountProvider = s -> 
TOPICS.contains(s) ? OptionalInt.of(2) : OptionalInt.empty();
           final RepartitionTopics repartitionTopics = new RepartitionTopics(
               LOG_CONTEXT,
               subtopologyToSubtopology,
               topicPartitionCountProvider
           );
   
           Map<String, Integer> setup = repartitionTopics.setup();
   
           assertEquals(
               mkMap(
                   mkEntry(REPARTITION_TOPIC_NAME1, 
REPARTITION_TOPIC_INFO1.partitions()),
                   mkEntry(REPARTITION_TOPIC_NAME3, 
REPARTITION_TOPIC_INFO3.partitions()),
                   mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, 
REPARTITION_TOPIC_INFO3.partitions())
               ),
               setup
           );
       }
   
   ```
   
   where `REPARTITION_TOPIC_INFO3` is:
   ```java
       private static final TopicInfo REPARTITION_TOPIC_INFO3 = new TopicInfo()
           .setName(REPARTITION_TOPIC_NAME3)
           .setPartitions(3)
           .setTopicConfigs(List.of(TOPIC_CONFIG2));
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to