cadonna commented on code in PR #18395: URL: https://github.com/apache/kafka/pull/18395#discussion_r1907228239
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Responsible for configuring the number of partitions in repartitioning topics. It computes a fix-point iteration, deriving the number of + * partitions for each repartition topic based on the number of partitions of the source topics of the topology, if the number of + * partitions is not explicitly set in the topology. + */ +public class RepartitionTopics { + + private final Logger log; + private final Collection<Subtopology> subtopologies; + private final Function<String, OptionalInt> topicPartitionCountProvider; + + /** + * The constructor for the class. + * + * @param logContext The context for emitting log messages. + * @param subtopologies The subtopologies for the requested topology. + * @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the + * broker. + */ + public RepartitionTopics(final LogContext logContext, + final Collection<Subtopology> subtopologies, + final Function<String, OptionalInt> topicPartitionCountProvider) { + this.log = logContext.logger(getClass()); + this.subtopologies = subtopologies; + this.topicPartitionCountProvider = topicPartitionCountProvider; + } + + /** + * Returns the set the number of partitions for each repartition topic. Review Comment: ```suggestion * Returns the set of the number of partitions for each repartition topic. ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RepartitionTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME1 = "source1"; + private static final String SOURCE_TOPIC_NAME2 = "source2"; + private static final String SINK_TOPIC_NAME1 = "sink1"; + private static final String SINK_TOPIC_NAME2 = "sink2"; + private static final String REPARTITION_TOPIC_NAME1 = "repartition1"; + private static final String REPARTITION_TOPIC_NAME2 = "repartition2"; + private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = "repartitionWithoutPartitionCount"; + private static final TopicConfig TOPIC_CONFIG1 = new TopicConfig().setKey("config1").setValue("val1"); + private static final TopicConfig TOPIC_CONFIG2 = new TopicConfig().setKey("config2").setValue("val2"); + private static final TopicConfig TOPIC_CONFIG5 = new TopicConfig().setKey("config5").setValue("val5"); + private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME1) + .setPartitions(4) + .setTopicConfigs(List.of(TOPIC_CONFIG1)); + private static final Subtopology SUBTOPOLOGY2 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY2") + .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1)) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME2) + .setPartitions(2) + .setTopicConfigs(List.of(TOPIC_CONFIG2)); + private static final Subtopology SUBTOPOLOGY1 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY1") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2)) + .setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2 + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo() + .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT) + .setTopicConfigs(List.of(TOPIC_CONFIG5)); + private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT") + .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final Set<String> TOPICS = Set.of( + SOURCE_TOPIC_NAME1, + SOURCE_TOPIC_NAME2, + SINK_TOPIC_NAME1, + SINK_TOPIC_NAME2, + REPARTITION_TOPIC_NAME1, + REPARTITION_TOPIC_NAME2 + ); + + @Test + public void shouldSetupRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + Map<String, Integer> setup = repartitionTopics.setup(); + + assertEquals( + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_INFO2.partitions()) + ), + setup + ); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = + s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() : + (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty()); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, + repartitionTopics::setup); + + assertNotNull(exception); Review Comment: Isn't this implied by `assertThrows()`? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopics.java: ########## @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.slf4j.Logger; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** + * Responsible for configuring the number of partitions in repartitioning topics. It computes a fix-point iteration, deriving the number of + * partitions for each repartition topic based on the number of partitions of the source topics of the topology, if the number of + * partitions is not explicitly set in the topology. + */ +public class RepartitionTopics { + + private final Logger log; + private final Collection<Subtopology> subtopologies; + private final Function<String, OptionalInt> topicPartitionCountProvider; + + /** + * The constructor for the class. + * + * @param logContext The context for emitting log messages. + * @param subtopologies The subtopologies for the requested topology. + * @param topicPartitionCountProvider Returns the number of partitions for a given topic, representing the current state of the + * broker. + */ + public RepartitionTopics(final LogContext logContext, + final Collection<Subtopology> subtopologies, + final Function<String, OptionalInt> topicPartitionCountProvider) { + this.log = logContext.logger(getClass()); + this.subtopologies = subtopologies; + this.topicPartitionCountProvider = topicPartitionCountProvider; + } + + /** + * Returns the set the number of partitions for each repartition topic. + * + * @return the map of repartition topics for the requested topology to their required number of partitions. + */ + public Map<String, Integer> setup() { + final Set<String> missingSourceTopicsForTopology = new HashSet<>(); + + for (final Subtopology subtopology : subtopologies) { + final Set<String> missingSourceTopicsForSubtopology = computeMissingExternalSourceTopics(subtopology); + missingSourceTopicsForTopology.addAll(missingSourceTopicsForSubtopology); + } + + if (missingSourceTopicsForTopology.isEmpty()) { + return computeRepartitionSourceTopicPartitionCount(); + } else { + throw TopicConfigurationException.missingSourceTopics(String.format("Missing source topics: %s", + String.join(", ", missingSourceTopicsForTopology))); + } + } + + private Set<String> computeMissingExternalSourceTopics(final Subtopology subtopology) { + final Set<String> missingExternalSourceTopics = new HashSet<>(subtopology.sourceTopics()); + for (final TopicInfo topicInfo : subtopology.repartitionSourceTopics()) { + missingExternalSourceTopics.remove(topicInfo.name()); + } + missingExternalSourceTopics.removeIf(x -> topicPartitionCountProvider.apply(x).isPresent()); + return missingExternalSourceTopics; + } + + /** + * Computes the number of partitions and returns it for each repartition topic. + */ + private Map<String, Integer> computeRepartitionSourceTopicPartitionCount() { + boolean partitionCountNeeded; + Map<String, Integer> repartitionSourceTopicPartitionCounts = new HashMap<>(); + Map<String, Set<String>> repartitionSinkTopics = + subtopologies.stream().collect( + Collectors.toMap( + Subtopology::subtopologyId, + x -> new HashSet<>(x.repartitionSinkTopics()) + ) + ); + + for (final Subtopology subtopology : subtopologies) { + for (final TopicInfo repartitionSourceTopic : subtopology.repartitionSourceTopics()) { + if (repartitionSourceTopic.partitions() != 0) { + repartitionSourceTopicPartitionCounts.put(repartitionSourceTopic.name(), repartitionSourceTopic.partitions()); + } + } + } + + do { + partitionCountNeeded = false; + // avoid infinitely looping without making any progress on unknown repartitions + boolean progressMadeThisIteration = false; + + for (final Subtopology subtopology : subtopologies) { + for (final TopicInfo repartitionSourceTopic : subtopology.repartitionSourceTopics()) { + final Integer repartitionSourceTopicPartitionCount = + repartitionSourceTopicPartitionCounts.get(repartitionSourceTopic.name()); + + if (repartitionSourceTopicPartitionCount == null) { + final Integer numPartitions = computePartitionCount( + repartitionSourceTopicPartitionCounts, + repartitionSourceTopic.name(), + repartitionSinkTopics + ); + + if (numPartitions == null) { + partitionCountNeeded = true; + log.trace("Unable to determine number of partitions for {}, another iteration is needed", + repartitionSourceTopic); + } else { + log.trace("Determined number of partitions for {} to be {}", + repartitionSourceTopic, + numPartitions); + repartitionSourceTopicPartitionCounts.put(repartitionSourceTopic.name(), numPartitions); + progressMadeThisIteration = true; + } + } + } + } + if (!progressMadeThisIteration && partitionCountNeeded) { + throw TopicConfigurationException.missingSourceTopics("Failed to compute number of partitions for all " + + "repartition topics, make sure all user input topics are created and all pattern subscriptions " + + "match at least one topic in the cluster"); + } + } while (partitionCountNeeded); + + return repartitionSourceTopicPartitionCounts; + } + + private Integer computePartitionCount(final Map<String, Integer> repartitionSourceTopicPartitionCounts, + final String repartitionSourceTopic, + Map<String, Set<String>> repartitionSinkTopics) { Review Comment: ```suggestion final Map<String, Set<String>> repartitionSinkTopics) { ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RepartitionTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME1 = "source1"; + private static final String SOURCE_TOPIC_NAME2 = "source2"; + private static final String SINK_TOPIC_NAME1 = "sink1"; + private static final String SINK_TOPIC_NAME2 = "sink2"; + private static final String REPARTITION_TOPIC_NAME1 = "repartition1"; + private static final String REPARTITION_TOPIC_NAME2 = "repartition2"; + private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = "repartitionWithoutPartitionCount"; + private static final TopicConfig TOPIC_CONFIG1 = new TopicConfig().setKey("config1").setValue("val1"); + private static final TopicConfig TOPIC_CONFIG2 = new TopicConfig().setKey("config2").setValue("val2"); + private static final TopicConfig TOPIC_CONFIG5 = new TopicConfig().setKey("config5").setValue("val5"); + private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME1) + .setPartitions(4) + .setTopicConfigs(List.of(TOPIC_CONFIG1)); + private static final Subtopology SUBTOPOLOGY2 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY2") + .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1)) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME2) + .setPartitions(2) + .setTopicConfigs(List.of(TOPIC_CONFIG2)); + private static final Subtopology SUBTOPOLOGY1 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY1") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2)) + .setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2 + )) Review Comment: Should those two repartition topics not also be part of the source topics on line 70? It does not change the test but I was confused. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RepartitionTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME1 = "source1"; + private static final String SOURCE_TOPIC_NAME2 = "source2"; + private static final String SINK_TOPIC_NAME1 = "sink1"; + private static final String SINK_TOPIC_NAME2 = "sink2"; + private static final String REPARTITION_TOPIC_NAME1 = "repartition1"; + private static final String REPARTITION_TOPIC_NAME2 = "repartition2"; + private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = "repartitionWithoutPartitionCount"; + private static final TopicConfig TOPIC_CONFIG1 = new TopicConfig().setKey("config1").setValue("val1"); + private static final TopicConfig TOPIC_CONFIG2 = new TopicConfig().setKey("config2").setValue("val2"); + private static final TopicConfig TOPIC_CONFIG5 = new TopicConfig().setKey("config5").setValue("val5"); + private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME1) + .setPartitions(4) + .setTopicConfigs(List.of(TOPIC_CONFIG1)); + private static final Subtopology SUBTOPOLOGY2 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY2") + .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1)) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME2) + .setPartitions(2) + .setTopicConfigs(List.of(TOPIC_CONFIG2)); + private static final Subtopology SUBTOPOLOGY1 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY1") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2)) + .setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2 + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo() + .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT) + .setTopicConfigs(List.of(TOPIC_CONFIG5)); + private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT") + .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final Set<String> TOPICS = Set.of( + SOURCE_TOPIC_NAME1, + SOURCE_TOPIC_NAME2, + SINK_TOPIC_NAME1, + SINK_TOPIC_NAME2, + REPARTITION_TOPIC_NAME1, + REPARTITION_TOPIC_NAME2 + ); + + @Test + public void shouldSetupRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + Map<String, Integer> setup = repartitionTopics.setup(); + + assertEquals( + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_INFO2.partitions()) + ), + setup + ); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = + s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() : + (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty()); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, + repartitionTopics::setup); + + assertNotNull(exception); + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals("Missing source topics: source1", exception.getMessage()); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY_WITHOUT_PARTITION_COUNT); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, repartitionTopics::setup); + + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals( + "Failed to compute number of partitions for all repartition topics, make sure all user input topics are created " + + "and all pattern subscriptions match at least one topic in the cluster", + exception.getMessage() + ); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() { + final Subtopology subtopology = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY0") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME2)) + .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT Review Comment: ```suggestion REPARTITION_TOPIC_INFO2 ``` ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RepartitionTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME1 = "source1"; + private static final String SOURCE_TOPIC_NAME2 = "source2"; + private static final String SINK_TOPIC_NAME1 = "sink1"; + private static final String SINK_TOPIC_NAME2 = "sink2"; + private static final String REPARTITION_TOPIC_NAME1 = "repartition1"; + private static final String REPARTITION_TOPIC_NAME2 = "repartition2"; + private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = "repartitionWithoutPartitionCount"; + private static final TopicConfig TOPIC_CONFIG1 = new TopicConfig().setKey("config1").setValue("val1"); + private static final TopicConfig TOPIC_CONFIG2 = new TopicConfig().setKey("config2").setValue("val2"); + private static final TopicConfig TOPIC_CONFIG5 = new TopicConfig().setKey("config5").setValue("val5"); + private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME1) + .setPartitions(4) + .setTopicConfigs(List.of(TOPIC_CONFIG1)); + private static final Subtopology SUBTOPOLOGY2 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY2") + .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1)) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME2) + .setPartitions(2) + .setTopicConfigs(List.of(TOPIC_CONFIG2)); + private static final Subtopology SUBTOPOLOGY1 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY1") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2)) + .setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2 + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo() + .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT) + .setTopicConfigs(List.of(TOPIC_CONFIG5)); + private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT") + .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final Set<String> TOPICS = Set.of( + SOURCE_TOPIC_NAME1, + SOURCE_TOPIC_NAME2, + SINK_TOPIC_NAME1, + SINK_TOPIC_NAME2, + REPARTITION_TOPIC_NAME1, + REPARTITION_TOPIC_NAME2 + ); + + @Test + public void shouldSetupRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + Map<String, Integer> setup = repartitionTopics.setup(); + + assertEquals( + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_INFO2.partitions()) + ), + setup + ); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = + s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() : + (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty()); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, + repartitionTopics::setup); + + assertNotNull(exception); + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals("Missing source topics: source1", exception.getMessage()); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY_WITHOUT_PARTITION_COUNT); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, repartitionTopics::setup); + + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals( + "Failed to compute number of partitions for all repartition topics, make sure all user input topics are created " + + "and all pattern subscriptions match at least one topic in the cluster", + exception.getMessage() + ); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() { + final Subtopology subtopology = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY0") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME2)) + .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + List<Subtopology> subtopologyToSubtopology = List.of( + subtopology, + SUBTOPOLOGY_WITHOUT_PARTITION_COUNT + ); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + Map<String, Integer> setup = repartitionTopics.setup(); + + assertEquals(mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_INFO2.partitions()), + mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, 3) + ), setup); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic() { + final Subtopology subtopology = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY0") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME1)) + .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + List<Subtopology> subtopologyToSubtopology = List.of(subtopology, SUBTOPOLOGY_WITHOUT_PARTITION_COUNT); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + Map<String, Integer> setup = repartitionTopics.setup(); + + assertEquals( + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_INFO2.partitions()), + mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, 4) + ), + setup + ); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamSourceTopicMultipleSubtopologies() { Review Comment: How is this different from `shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic()`? ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RepartitionTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME1 = "source1"; + private static final String SOURCE_TOPIC_NAME2 = "source2"; + private static final String SINK_TOPIC_NAME1 = "sink1"; + private static final String SINK_TOPIC_NAME2 = "sink2"; + private static final String REPARTITION_TOPIC_NAME1 = "repartition1"; + private static final String REPARTITION_TOPIC_NAME2 = "repartition2"; + private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = "repartitionWithoutPartitionCount"; + private static final TopicConfig TOPIC_CONFIG1 = new TopicConfig().setKey("config1").setValue("val1"); + private static final TopicConfig TOPIC_CONFIG2 = new TopicConfig().setKey("config2").setValue("val2"); + private static final TopicConfig TOPIC_CONFIG5 = new TopicConfig().setKey("config5").setValue("val5"); + private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME1) + .setPartitions(4) + .setTopicConfigs(List.of(TOPIC_CONFIG1)); + private static final Subtopology SUBTOPOLOGY2 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY2") + .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1)) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME2) + .setPartitions(2) + .setTopicConfigs(List.of(TOPIC_CONFIG2)); + private static final Subtopology SUBTOPOLOGY1 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY1") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2)) + .setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2 + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo() + .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT) + .setTopicConfigs(List.of(TOPIC_CONFIG5)); + private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT") + .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final Set<String> TOPICS = Set.of( + SOURCE_TOPIC_NAME1, + SOURCE_TOPIC_NAME2, + SINK_TOPIC_NAME1, + SINK_TOPIC_NAME2, + REPARTITION_TOPIC_NAME1, + REPARTITION_TOPIC_NAME2 + ); + + @Test + public void shouldSetupRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + Map<String, Integer> setup = repartitionTopics.setup(); + + assertEquals( + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_INFO2.partitions()) + ), + setup + ); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = + s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() : + (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty()); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, + repartitionTopics::setup); + + assertNotNull(exception); + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals("Missing source topics: source1", exception.getMessage()); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY_WITHOUT_PARTITION_COUNT); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, repartitionTopics::setup); + + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals( + "Failed to compute number of partitions for all repartition topics, make sure all user input topics are created " + + "and all pattern subscriptions match at least one topic in the cluster", + exception.getMessage() + ); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() { + final Subtopology subtopology = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY0") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME2)) + .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + List<Subtopology> subtopologyToSubtopology = List.of( + subtopology, + SUBTOPOLOGY_WITHOUT_PARTITION_COUNT + ); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); Review Comment: ```suggestion int expectedPartitionCount = 3 Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(expectedPartitionCount) : OptionalInt.empty(); ``` and use `expectedPartitionCount` in the verification on line 187. ########## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/RepartitionTopicsTest.java: ########## @@ -0,0 +1,273 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.streams.topics; + +import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.Subtopology; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicConfig; +import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue.TopicInfo; + +import org.junit.jupiter.api.Test; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; +import java.util.function.Function; + +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +public class RepartitionTopicsTest { + + private static final LogContext LOG_CONTEXT = new LogContext(); + private static final String SOURCE_TOPIC_NAME1 = "source1"; + private static final String SOURCE_TOPIC_NAME2 = "source2"; + private static final String SINK_TOPIC_NAME1 = "sink1"; + private static final String SINK_TOPIC_NAME2 = "sink2"; + private static final String REPARTITION_TOPIC_NAME1 = "repartition1"; + private static final String REPARTITION_TOPIC_NAME2 = "repartition2"; + private static final String REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT = "repartitionWithoutPartitionCount"; + private static final TopicConfig TOPIC_CONFIG1 = new TopicConfig().setKey("config1").setValue("val1"); + private static final TopicConfig TOPIC_CONFIG2 = new TopicConfig().setKey("config2").setValue("val2"); + private static final TopicConfig TOPIC_CONFIG5 = new TopicConfig().setKey("config5").setValue("val5"); + private static final TopicInfo REPARTITION_TOPIC_INFO1 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME1) + .setPartitions(4) + .setTopicConfigs(List.of(TOPIC_CONFIG1)); + private static final Subtopology SUBTOPOLOGY2 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY2") + .setSourceTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of(REPARTITION_TOPIC_INFO1)) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO2 = new TopicInfo() + .setName(REPARTITION_TOPIC_NAME2) + .setPartitions(2) + .setTopicConfigs(List.of(TOPIC_CONFIG2)); + private static final Subtopology SUBTOPOLOGY1 = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY1") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, SOURCE_TOPIC_NAME2)) + .setRepartitionSinkTopics(Collections.singletonList(REPARTITION_TOPIC_NAME1)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2 + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final TopicInfo REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT = new TopicInfo() + .setName(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT) + .setTopicConfigs(List.of(TOPIC_CONFIG5)); + private static final Subtopology SUBTOPOLOGY_WITHOUT_PARTITION_COUNT = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY_WITHOUT_PARTITION_COUNT") + .setSourceTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSinkTopics(Collections.singletonList(SINK_TOPIC_NAME2)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + private static final Set<String> TOPICS = Set.of( + SOURCE_TOPIC_NAME1, + SOURCE_TOPIC_NAME2, + SINK_TOPIC_NAME1, + SINK_TOPIC_NAME2, + REPARTITION_TOPIC_NAME1, + REPARTITION_TOPIC_NAME2 + ); + + @Test + public void shouldSetupRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + Map<String, Integer> setup = repartitionTopics.setup(); + + assertEquals( + mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_INFO2.partitions()) + ), + setup + ); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfMissingSourceTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY2); + Function<String, OptionalInt> topicPartitionCountProvider = + s -> Objects.equals(s, SOURCE_TOPIC_NAME1) ? OptionalInt.empty() : + (TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty()); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + final TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, + repartitionTopics::setup); + + assertNotNull(exception); + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals("Missing source topics: source1", exception.getMessage()); + } + + @Test + public void shouldThrowStreamsMissingSourceTopicsExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics() { + List<Subtopology> subtopologyToSubtopology = List.of(SUBTOPOLOGY1, SUBTOPOLOGY_WITHOUT_PARTITION_COUNT); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + TopicConfigurationException exception = assertThrows(TopicConfigurationException.class, repartitionTopics::setup); + + assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status()); + assertEquals( + "Failed to compute number of partitions for all repartition topics, make sure all user input topics are created " + + "and all pattern subscriptions match at least one topic in the cluster", + exception.getMessage() + ); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamExternalSourceTopic() { + final Subtopology subtopology = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY0") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME2)) + .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT + )) + .setStateChangelogTopics(Collections.emptyList()); + List<Subtopology> subtopologyToSubtopology = List.of( + subtopology, + SUBTOPOLOGY_WITHOUT_PARTITION_COUNT + ); + Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(3) : OptionalInt.empty(); + final RepartitionTopics repartitionTopics = new RepartitionTopics( + LOG_CONTEXT, + subtopologyToSubtopology, + topicPartitionCountProvider + ); + + Map<String, Integer> setup = repartitionTopics.setup(); + + assertEquals(mkMap( + mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), + mkEntry(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_INFO2.partitions()), + mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, 3) + ), setup); + } + + @Test + public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic() { + final Subtopology subtopology = new Subtopology() + .setSubtopologyId("SUBTOPOLOGY0") + .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME1)) + .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME2, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) + .setRepartitionSourceTopics(List.of( + REPARTITION_TOPIC_INFO1, + REPARTITION_TOPIC_INFO2, + REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT Review Comment: This test is a bit confusing. `REPARTITION_TOPIC_INFO_WITHOUT_PARTITION_COUNT` and `REPARTITION_TOPIC_INFO2` in `RepartitionSourceTopics` have no reason, haven't they? Additionally, `REPARTITION_TOPIC_INFO1` makes the two subtopology be a cycle. The test is correct also this way, but it makes the test hard to read and understand. I propose to something like this: ```java @Test public void shouldSetRepartitionTopicPartitionCountFromUpstreamInternalRepartitionSourceTopic() { final Subtopology subtopology = new Subtopology() .setSubtopologyId("SUBTOPOLOGY0") .setSourceTopics(List.of(SOURCE_TOPIC_NAME1, REPARTITION_TOPIC_NAME3)) .setRepartitionSinkTopics(List.of(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT)) .setRepartitionSourceTopics(List.of( REPARTITION_TOPIC_INFO3 )) .setStateChangelogTopics(Collections.emptyList()); List<Subtopology> subtopologyToSubtopology = List.of(subtopology, SUBTOPOLOGY_WITHOUT_PARTITION_COUNT); Function<String, OptionalInt> topicPartitionCountProvider = s -> TOPICS.contains(s) ? OptionalInt.of(2) : OptionalInt.empty(); final RepartitionTopics repartitionTopics = new RepartitionTopics( LOG_CONTEXT, subtopologyToSubtopology, topicPartitionCountProvider ); Map<String, Integer> setup = repartitionTopics.setup(); assertEquals( mkMap( mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_INFO1.partitions()), mkEntry(REPARTITION_TOPIC_NAME3, REPARTITION_TOPIC_INFO3.partitions()), mkEntry(REPARTITION_TOPIC_WITHOUT_PARTITION_COUNT, REPARTITION_TOPIC_INFO3.partitions()) ), setup ); } ``` where `REPARTITION_TOPIC_INFO3` is: ```java private static final TopicInfo REPARTITION_TOPIC_INFO3 = new TopicInfo() .setName(REPARTITION_TOPIC_NAME3) .setPartitions(3) .setTopicConfigs(List.of(TOPIC_CONFIG2)); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
