This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 35489bfca32 KAFKA-18311: Add internal datastructure for configuring
topologies (1/N) (#18268)
35489bfca32 is described below
commit 35489bfca32a97bf19a7a5fb06e4389b3b2f3b07
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jan 6 13:51:00 2025 +0100
KAFKA-18311: Add internal datastructure for configuring topologies (1/N)
(#18268)
Clients in the Streams Rebalance Protocol send an "unconfigured"
representation of the topology to the broker. That is, the number of input
topics and (some) internal topics is not fixed, regular expressions are not
resolved. The broker takes this description of the topology and, together with
the current state of the topics on the broker, derives a ConfiguredTopology.
The configured topology is what is being returned from StreamsGroupDescribe,
and has all number of partitions defined [...]
In this change, we add the internal data structures for representing the
configured topology. They differ in some details from the data structures used
in the RPCs. Most importantly, they can be evolved independently of the public
interface.
Reviewers: Bruno Cadonna <[email protected]>
---
.../streams/topics/ConfiguredInternalTopic.java | 69 ++++++++++++
.../streams/topics/ConfiguredSubtopology.java | 62 +++++++++++
.../group/streams/topics/ConfiguredTopology.java | 67 ++++++++++++
.../topics/TopicConfigurationException.java | 46 ++++++++
.../topics/ConfiguredInternalTopicTest.java | 76 +++++++++++++
.../streams/topics/ConfiguredSubtopologyTest.java | 106 +++++++++++++++++++
.../streams/topics/ConfiguredTopologyTest.java | 117 +++++++++++++++++++++
.../topics/TopicConfigurationExceptionTest.java | 48 +++++++++
8 files changed, 591 insertions(+)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
new file mode 100644
index 00000000000..855f1ea0b58
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Captures the properties required for configuring the internal topics we
create for changelogs and repartitioning etc.
+ * <p>
+ * It is derived from the topology sent by the client, and the current state
of the topics inside the broker. If the topics on the broker
+ * changes, the internal topic may need to be reconfigured.
+ *
+ * @param name The name of the topic.
+ * @param numberOfPartitions The number of partitions for the topic.
+ * @param replicationFactor The replication factor of the topic. If
undefiend, the broker default is used.
+ * @param topicConfigs The topic configurations of the topic.
+ */
+public record ConfiguredInternalTopic(String name,
+ int numberOfPartitions,
+ Optional<Short> replicationFactor,
+ Map<String, String> topicConfigs
+) {
+
+ public ConfiguredInternalTopic {
+ Objects.requireNonNull(name, "name can't be null");
+ Topic.validate(name);
+ if (numberOfPartitions < 1) {
+ throw new IllegalArgumentException("Number of partitions must be
at least 1.");
+ }
+ topicConfigs =
Collections.unmodifiableMap(Objects.requireNonNull(topicConfigs, "topicConfigs
can't be null"));
+ }
+
+ public StreamsGroupDescribeResponseData.TopicInfo
asStreamsGroupDescribeTopicInfo() {
+ return new StreamsGroupDescribeResponseData.TopicInfo()
+ .setName(name)
+ .setPartitions(numberOfPartitions)
+ .setReplicationFactor(replicationFactor.orElse((short) 0))
+ .setTopicConfigs(
+ topicConfigs != null ?
+ topicConfigs.entrySet().stream().map(
+ y -> new StreamsGroupDescribeResponseData.KeyValue()
+ .setKey(y.getKey())
+ .setValue(y.getValue())
+ ).collect(Collectors.toList()) : null
+ );
+ }
+
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
new file mode 100644
index 00000000000..bfc1a86a06b
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Internal representation of a subtopology.
+ * <p>
+ * The subtopology is configured according to the number of partitions
available in the source topics. It has regular expressions already
+ * resolved and defined exactly the information that is being used by streams
groups assignment reconciliation.
+ * <p>
+ * Configured subtopologies may be recreated every time the input topics used
by the subtopology are modified.
+ *
+ * @param sourceTopics The source topics of the subtopology.
+ * @param repartitionSourceTopics The repartition source topics of the
subtopology.
+ * @param repartitionSinkTopics The repartition sink topics of the
subtopology.
+ * @param stateChangelogTopics The state changelog topics of the
subtopology.
+ */
+public record ConfiguredSubtopology(Set<String> sourceTopics,
+ Map<String, ConfiguredInternalTopic>
repartitionSourceTopics,
+ Set<String> repartitionSinkTopics,
+ Map<String, ConfiguredInternalTopic>
stateChangelogTopics) {
+
+ public ConfiguredSubtopology {
+ Objects.requireNonNull(sourceTopics, "sourceTopics can't be null");
+ Objects.requireNonNull(repartitionSourceTopics,
"repartitionSourceTopics can't be null");
+ Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics
can't be null");
+ Objects.requireNonNull(stateChangelogTopics, "stateChangelogTopics
can't be null");
+ }
+
+ public StreamsGroupDescribeResponseData.Subtopology
asStreamsGroupDescribeSubtopology(String subtopologyId) {
+ return new StreamsGroupDescribeResponseData.Subtopology()
+ .setSubtopologyId(subtopologyId)
+
.setSourceTopics(sourceTopics.stream().sorted().collect(Collectors.toList()))
+
.setRepartitionSinkTopics(repartitionSinkTopics.stream().sorted().collect(Collectors.toList()))
+
.setRepartitionSourceTopics(repartitionSourceTopics.values().stream()
+
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList()))
+ .setStateChangelogTopics(stateChangelogTopics.values().stream()
+
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList()));
+ }
+
+}
\ No newline at end of file
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
new file mode 100644
index 00000000000..86f8080421c
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This class captures the result of taking a topology definition sent by the
client and using the current state of the topics inside the
+ * broker to configure the internal topics required for the topology.
+ *
+ * @param topologyEpoch The epoch of the topology. Same as the
topology epoch in the heartbeat request that last initialized
+ * the topology.
+ * @param subtopologies Contains the subtopologies that have
been configured. This can be used by the task assignors, since it
+ * specifies the number of tasks available
for every subtopology.
+ * @param internalTopicsToBeCreated Contains a list of internal topics that
need to be created. This is used to create the topics in the
+ * broker.
+ * @param topicConfigurationException If the topic configuration process
failed, e.g. because expected topics are missing or have an
+ * incorrect number of partitions, this
field will store the error that occurred, so that is can be
+ * reported back to the client.
+ */
+public record ConfiguredTopology(int topologyEpoch,
+ Map<String, ConfiguredSubtopology>
subtopologies,
+ Map<String, CreatableTopic>
internalTopicsToBeCreated,
+ Optional<TopicConfigurationException>
topicConfigurationException) {
+
+ public ConfiguredTopology {
+ if (topologyEpoch < 0) {
+ throw new IllegalArgumentException("Topology epoch must be
non-negative.");
+ }
+ Objects.requireNonNull(subtopologies, "subtopologies can't be null");
+ Objects.requireNonNull(internalTopicsToBeCreated,
"internalTopicsToBeCreated can't be null");
+ Objects.requireNonNull(topicConfigurationException,
"topicConfigurationException can't be null");
+ }
+
+ public boolean isReady() {
+ return topicConfigurationException.isEmpty();
+ }
+
+ public StreamsGroupDescribeResponseData.Topology
asStreamsGroupDescribeTopology() {
+ return new StreamsGroupDescribeResponseData.Topology()
+ .setEpoch(topologyEpoch)
+ .setSubtopologies(subtopologies.entrySet().stream().map(
+ entry ->
entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey())
+ ).collect(Collectors.toList()));
+ }
+
+}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java
new file mode 100644
index 00000000000..f52c950b770
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+
+public class TopicConfigurationException extends RuntimeException {
+
+ private final Status status;
+
+ public TopicConfigurationException(StreamsGroupHeartbeatResponse.Status
status, String message) {
+ super(message);
+ this.status = status;
+ }
+
+ public Status status() {
+ return status;
+ }
+
+ public static TopicConfigurationException
incorrectlyPartitionedTopics(String message) {
+ return new
TopicConfigurationException(Status.INCORRECTLY_PARTITIONED_TOPICS, message);
+ }
+
+ public static TopicConfigurationException missingSourceTopics(String
message) {
+ return new TopicConfigurationException(Status.MISSING_SOURCE_TOPICS,
message);
+ }
+
+ public static TopicConfigurationException missingInternalTopics(String
message) {
+ return new TopicConfigurationException(Status.MISSING_INTERNAL_TOPICS,
message);
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
new file mode 100644
index 00000000000..e1db0f048ac
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ConfiguredInternalTopicTest {
+
+ @Test
+ public void testConstructorWithNullName() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredInternalTopic(null, 1, Optional.empty(),
Collections.emptyMap()));
+ }
+
+ @Test
+ public void testConstructorWithInvalidName() {
+ assertThrows(InvalidTopicException.class,
+ () -> new ConfiguredInternalTopic("invalid topic name", 1,
Optional.empty(), Collections.emptyMap()));
+ }
+
+ @Test
+ public void testConstructorWithNullTopicConfigs() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredInternalTopic("test-topic", 1,
Optional.empty(), null));
+ }
+
+ @Test
+ public void testConstructorWithZeroPartitions() {
+ assertThrows(IllegalArgumentException.class,
+ () -> new ConfiguredInternalTopic("test-topic", 0,
Optional.empty(), Collections.emptyMap()));
+ }
+
+ @Test
+ public void testAsStreamsGroupDescribeTopicInfo() {
+ String topicName = "test-topic";
+ Map<String, String> topicConfigs = new HashMap<>();
+ topicConfigs.put("retention.ms", "1000");
+ int numberOfPartitions = 3;
+ Optional<Short> replicationFactor = Optional.of((short) 2);
+ ConfiguredInternalTopic configuredInternalTopic = new
ConfiguredInternalTopic(
+ topicName, numberOfPartitions, replicationFactor, topicConfigs);
+
+ StreamsGroupDescribeResponseData.TopicInfo topicInfo =
configuredInternalTopic.asStreamsGroupDescribeTopicInfo();
+
+ assertEquals(topicName, topicInfo.name());
+ assertEquals(numberOfPartitions, topicInfo.partitions());
+ assertEquals(replicationFactor.orElse((short) 0).shortValue(),
topicInfo.replicationFactor());
+ assertEquals(1, topicInfo.topicConfigs().size());
+ assertEquals("1000", topicInfo.topicConfigs().get(0).value());
+ }
+}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
new file mode 100644
index 00000000000..d30716c25f7
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfiguredSubtopologyTest {
+
+ @Test
+ public void testConstructorWithNullSourceTopics() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredSubtopology(
+ null,
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ Collections.emptyMap()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullRepartitionSourceTopics() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredSubtopology(
+ Collections.emptySet(),
+ null,
+ Collections.emptySet(),
+ Collections.emptyMap()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullRepartitionSinkTopics() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredSubtopology(
+ Collections.emptySet(),
+ Collections.emptyMap(),
+ null,
+ Collections.emptyMap()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullStateChangelogTopics() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredSubtopology(
+ Collections.emptySet(),
+ Collections.emptyMap(),
+ Collections.emptySet(),
+ null
+ )
+ );
+ }
+
+ @Test
+ public void testAsStreamsGroupDescribeSubtopology() {
+ String subtopologyId = "subtopology1";
+ Set<String> sourceTopics = new HashSet<>(Set.of("sourceTopic1",
"sourceTopic2"));
+ Set<String> repartitionSinkTopics = new
HashSet<>(Set.of("repartitionSinkTopic1", "repartitionSinkTopic2"));
+ ConfiguredInternalTopic internalTopicMock =
mock(ConfiguredInternalTopic.class);
+ StreamsGroupDescribeResponseData.TopicInfo topicInfo = new
StreamsGroupDescribeResponseData.TopicInfo();
+
when(internalTopicMock.asStreamsGroupDescribeTopicInfo()).thenReturn(topicInfo);
+ Map<String, ConfiguredInternalTopic> repartitionSourceTopics =
Map.of("repartitionSourceTopic1", internalTopicMock);
+ Map<String, ConfiguredInternalTopic> stateChangelogTopics =
Map.of("stateChangelogTopic1", internalTopicMock);
+ ConfiguredSubtopology configuredSubtopology = new
ConfiguredSubtopology(
+ sourceTopics, repartitionSourceTopics, repartitionSinkTopics,
stateChangelogTopics);
+
+ StreamsGroupDescribeResponseData.Subtopology subtopology =
configuredSubtopology.asStreamsGroupDescribeSubtopology(subtopologyId);
+
+ assertEquals(subtopologyId, subtopology.subtopologyId());
+ assertEquals(sourceTopics.stream().sorted().toList(),
subtopology.sourceTopics());
+ assertEquals(repartitionSinkTopics.stream().sorted().toList(),
subtopology.repartitionSinkTopics());
+ assertEquals(List.of(topicInfo),
subtopology.repartitionSourceTopics());
+ assertEquals(List.of(topicInfo), subtopology.stateChangelogTopics());
+ }
+
+}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
new file mode 100644
index 00000000000..fc862a7a027
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfiguredTopologyTest {
+
+ @Test
+ public void testConstructorWithNullSubtopologies() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredTopology(
+ 0,
+ null,
+ Collections.emptyMap(),
+ Optional.empty()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullInternalTopicsToBeCreated() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredTopology(
+ 0,
+ Collections.emptyMap(),
+ null,
+ Optional.empty()
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithNullTopicConfigurationException() {
+ assertThrows(NullPointerException.class,
+ () -> new ConfiguredTopology(
+ 0,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ null
+ )
+ );
+ }
+
+ @Test
+ public void testConstructorWithInvalidTopologyEpoch() {
+ assertThrows(IllegalArgumentException.class,
+ () -> new ConfiguredTopology(
+ -1,
+ Collections.emptyMap(),
+ Collections.emptyMap(),
+ Optional.empty()
+ )
+ );
+ }
+
+ @Test
+ public void testIsReady() {
+ ConfiguredTopology readyTopology = new ConfiguredTopology(
+ 1, new HashMap<>(), new HashMap<>(), Optional.empty());
+ assertTrue(readyTopology.isReady());
+
+ ConfiguredTopology notReadyTopology = new ConfiguredTopology(
+ 1, new HashMap<>(), new HashMap<>(),
Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
+ assertFalse(notReadyTopology.isReady());
+ }
+
+ @Test
+ public void testAsStreamsGroupDescribeTopology() {
+ int topologyEpoch = 1;
+ ConfiguredSubtopology subtopologyMock =
mock(ConfiguredSubtopology.class);
+ StreamsGroupDescribeResponseData.Subtopology subtopologyResponse = new
StreamsGroupDescribeResponseData.Subtopology();
+
when(subtopologyMock.asStreamsGroupDescribeSubtopology(Mockito.anyString())).thenReturn(subtopologyResponse);
+ Map<String, ConfiguredSubtopology> subtopologies = new HashMap<>();
+ subtopologies.put("subtopology1", subtopologyMock);
+ Map<String, CreatableTopic> internalTopicsToBeCreated = new
HashMap<>();
+ Optional<TopicConfigurationException> topicConfigurationException =
Optional.empty();
+ ConfiguredTopology configuredTopology = new ConfiguredTopology(
+ topologyEpoch, subtopologies, internalTopicsToBeCreated,
topicConfigurationException);
+
+ StreamsGroupDescribeResponseData.Topology topology =
configuredTopology.asStreamsGroupDescribeTopology();
+
+ assertEquals(topologyEpoch, topology.epoch());
+ assertEquals(1, topology.subtopologies().size());
+ assertEquals(subtopologyResponse, topology.subtopologies().get(0));
+ }
+}
\ No newline at end of file
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java
new file mode 100644
index 00000000000..479cef5db13
--- /dev/null
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TopicConfigurationExceptionTest {
+
+ @Test
+ public void testMissingSourceTopics() {
+ TopicConfigurationException exception =
TopicConfigurationException.missingSourceTopics("test");
+ assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+ assertEquals("test", exception.getMessage());
+ }
+
+ @Test
+ public void testMissingInternalTopics() {
+ TopicConfigurationException exception =
TopicConfigurationException.missingInternalTopics("test");
+ assertEquals(Status.MISSING_INTERNAL_TOPICS, exception.status());
+ assertEquals("test", exception.getMessage());
+ }
+
+ @Test
+ public void testIncorrectlyPartitionedTopics() {
+ TopicConfigurationException exception =
TopicConfigurationException.incorrectlyPartitionedTopics("test");
+ assertEquals(Status.INCORRECTLY_PARTITIONED_TOPICS,
exception.status());
+ assertEquals("test", exception.getMessage());
+ }
+
+}