This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 35489bfca32 KAFKA-18311: Add internal datastructure for configuring 
topologies (1/N) (#18268)
35489bfca32 is described below

commit 35489bfca32a97bf19a7a5fb06e4389b3b2f3b07
Author: Lucas Brutschy <[email protected]>
AuthorDate: Mon Jan 6 13:51:00 2025 +0100

    KAFKA-18311: Add internal datastructure for configuring topologies (1/N) 
(#18268)
    
    Clients in the Streams Rebalance Protocol send an "unconfigured" 
representation of the topology to the broker. That is, the number of input 
topics and (some) internal topics is not fixed, regular expressions are not 
resolved. The broker takes this description of the topology and, together with 
the current state of the topics on the broker, derives a ConfiguredTopology. 
The configured topology is what is being returned from StreamsGroupDescribe, 
and has all number of partitions defined [...]
    
    In this change, we add the internal data structures for representing the 
configured topology. They differ in some details from the data structures used 
in the RPCs. Most importantly, they can be evolved independently of the public 
interface.
    
    Reviewers: Bruno Cadonna <[email protected]>
---
 .../streams/topics/ConfiguredInternalTopic.java    |  69 ++++++++++++
 .../streams/topics/ConfiguredSubtopology.java      |  62 +++++++++++
 .../group/streams/topics/ConfiguredTopology.java   |  67 ++++++++++++
 .../topics/TopicConfigurationException.java        |  46 ++++++++
 .../topics/ConfiguredInternalTopicTest.java        |  76 +++++++++++++
 .../streams/topics/ConfiguredSubtopologyTest.java  | 106 +++++++++++++++++++
 .../streams/topics/ConfiguredTopologyTest.java     | 117 +++++++++++++++++++++
 .../topics/TopicConfigurationExceptionTest.java    |  48 +++++++++
 8 files changed, 591 insertions(+)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
new file mode 100644
index 00000000000..855f1ea0b58
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopic.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * Captures the properties required for configuring the internal topics we 
create for changelogs and repartitioning etc.
+ * <p>
+ * It is derived from the topology sent by the client, and the current state 
of the topics inside the broker. If the topics on the broker
+ * changes, the internal topic may need to be reconfigured.
+ *
+ * @param name               The name of the topic.
+ * @param numberOfPartitions The number of partitions for the topic.
+ * @param replicationFactor  The replication factor of the topic. If 
undefiend, the broker default is used.
+ * @param topicConfigs       The topic configurations of the topic.
+ */
+public record ConfiguredInternalTopic(String name,
+                                      int numberOfPartitions,
+                                      Optional<Short> replicationFactor,
+                                      Map<String, String> topicConfigs
+) {
+
+    public ConfiguredInternalTopic {
+        Objects.requireNonNull(name, "name can't be null");
+        Topic.validate(name);
+        if (numberOfPartitions < 1) {
+            throw new IllegalArgumentException("Number of partitions must be 
at least 1.");
+        }
+        topicConfigs = 
Collections.unmodifiableMap(Objects.requireNonNull(topicConfigs, "topicConfigs 
can't be null"));
+    }
+
+    public StreamsGroupDescribeResponseData.TopicInfo 
asStreamsGroupDescribeTopicInfo() {
+        return new StreamsGroupDescribeResponseData.TopicInfo()
+            .setName(name)
+            .setPartitions(numberOfPartitions)
+            .setReplicationFactor(replicationFactor.orElse((short) 0))
+            .setTopicConfigs(
+                topicConfigs != null ?
+                    topicConfigs.entrySet().stream().map(
+                        y -> new StreamsGroupDescribeResponseData.KeyValue()
+                            .setKey(y.getKey())
+                            .setValue(y.getValue())
+                    ).collect(Collectors.toList()) : null
+            );
+    }
+
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
new file mode 100644
index 00000000000..bfc1a86a06b
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopology.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Internal representation of a subtopology.
+ * <p>
+ * The subtopology is configured according to the number of partitions 
available in the source topics. It has regular expressions already
+ * resolved and defined exactly the information that is being used by streams 
groups assignment reconciliation.
+ * <p>
+ * Configured subtopologies may be recreated every time the input topics used 
by the subtopology are modified.
+ *
+ * @param sourceTopics            The source topics of the subtopology.
+ * @param repartitionSourceTopics The repartition source topics of the 
subtopology.
+ * @param repartitionSinkTopics   The repartition sink topics of the 
subtopology.
+ * @param stateChangelogTopics    The state changelog topics of the 
subtopology.
+ */
+public record ConfiguredSubtopology(Set<String> sourceTopics,
+                                    Map<String, ConfiguredInternalTopic> 
repartitionSourceTopics,
+                                    Set<String> repartitionSinkTopics,
+                                    Map<String, ConfiguredInternalTopic> 
stateChangelogTopics) {
+
+    public ConfiguredSubtopology {
+        Objects.requireNonNull(sourceTopics, "sourceTopics can't be null");
+        Objects.requireNonNull(repartitionSourceTopics, 
"repartitionSourceTopics can't be null");
+        Objects.requireNonNull(repartitionSinkTopics, "repartitionSinkTopics 
can't be null");
+        Objects.requireNonNull(stateChangelogTopics, "stateChangelogTopics 
can't be null");
+    }
+
+    public StreamsGroupDescribeResponseData.Subtopology 
asStreamsGroupDescribeSubtopology(String subtopologyId) {
+        return new StreamsGroupDescribeResponseData.Subtopology()
+            .setSubtopologyId(subtopologyId)
+            
.setSourceTopics(sourceTopics.stream().sorted().collect(Collectors.toList()))
+            
.setRepartitionSinkTopics(repartitionSinkTopics.stream().sorted().collect(Collectors.toList()))
+            
.setRepartitionSourceTopics(repartitionSourceTopics.values().stream()
+                
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList()))
+            .setStateChangelogTopics(stateChangelogTopics.values().stream()
+                
.map(ConfiguredInternalTopic::asStreamsGroupDescribeTopicInfo).sorted().collect(Collectors.toList()));
+    }
+
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
new file mode 100644
index 00000000000..86f8080421c
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopology.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * This class captures the result of taking a topology definition sent by the 
client and using the current state of the topics inside the
+ * broker to configure the internal topics required for the topology.
+ *
+ * @param topologyEpoch               The epoch of the topology. Same as the 
topology epoch in the heartbeat request that last initialized
+ *                                    the topology.
+ * @param subtopologies               Contains the subtopologies that have 
been configured. This can be used by the task assignors, since it
+ *                                    specifies the number of tasks available 
for every subtopology.
+ * @param internalTopicsToBeCreated   Contains a list of internal topics that 
need to be created. This is used to create the topics in the
+ *                                    broker.
+ * @param topicConfigurationException If the topic configuration process 
failed, e.g. because expected topics are missing or have an
+ *                                    incorrect number of partitions, this 
field will store the error that occurred, so that is can be
+ *                                    reported back to the client.
+ */
+public record ConfiguredTopology(int topologyEpoch,
+                                 Map<String, ConfiguredSubtopology> 
subtopologies,
+                                 Map<String, CreatableTopic> 
internalTopicsToBeCreated,
+                                 Optional<TopicConfigurationException> 
topicConfigurationException) {
+
+    public ConfiguredTopology {
+        if (topologyEpoch < 0) {
+            throw new IllegalArgumentException("Topology epoch must be 
non-negative.");
+        }
+        Objects.requireNonNull(subtopologies, "subtopologies can't be null");
+        Objects.requireNonNull(internalTopicsToBeCreated, 
"internalTopicsToBeCreated can't be null");
+        Objects.requireNonNull(topicConfigurationException, 
"topicConfigurationException can't be null");
+    }
+
+    public boolean isReady() {
+        return topicConfigurationException.isEmpty();
+    }
+
+    public StreamsGroupDescribeResponseData.Topology 
asStreamsGroupDescribeTopology() {
+        return new StreamsGroupDescribeResponseData.Topology()
+            .setEpoch(topologyEpoch)
+            .setSubtopologies(subtopologies.entrySet().stream().map(
+                entry -> 
entry.getValue().asStreamsGroupDescribeSubtopology(entry.getKey())
+            ).collect(Collectors.toList()));
+    }
+
+}
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java
new file mode 100644
index 00000000000..f52c950b770
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationException.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse;
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+
+public class TopicConfigurationException extends RuntimeException {
+
+    private final Status status;
+
+    public TopicConfigurationException(StreamsGroupHeartbeatResponse.Status 
status, String message) {
+        super(message);
+        this.status = status;
+    }
+
+    public Status status() {
+        return status;
+    }
+
+    public static TopicConfigurationException 
incorrectlyPartitionedTopics(String message) {
+        return new 
TopicConfigurationException(Status.INCORRECTLY_PARTITIONED_TOPICS, message);
+    }
+
+    public static TopicConfigurationException missingSourceTopics(String 
message) {
+        return new TopicConfigurationException(Status.MISSING_SOURCE_TOPICS, 
message);
+    }
+
+    public static TopicConfigurationException missingInternalTopics(String 
message) {
+        return new TopicConfigurationException(Status.MISSING_INTERNAL_TOPICS, 
message);
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
new file mode 100644
index 00000000000..e1db0f048ac
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredInternalTopicTest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.errors.InvalidTopicException;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+public class ConfiguredInternalTopicTest {
+
+    @Test
+    public void testConstructorWithNullName() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredInternalTopic(null, 1, Optional.empty(), 
Collections.emptyMap()));
+    }
+
+    @Test
+    public void testConstructorWithInvalidName() {
+        assertThrows(InvalidTopicException.class,
+            () -> new ConfiguredInternalTopic("invalid topic name", 1, 
Optional.empty(), Collections.emptyMap()));
+    }
+
+    @Test
+    public void testConstructorWithNullTopicConfigs() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredInternalTopic("test-topic", 1, 
Optional.empty(), null));
+    }
+
+    @Test
+    public void testConstructorWithZeroPartitions() {
+        assertThrows(IllegalArgumentException.class,
+            () -> new ConfiguredInternalTopic("test-topic", 0, 
Optional.empty(), Collections.emptyMap()));
+    }
+
+    @Test
+    public void testAsStreamsGroupDescribeTopicInfo() {
+        String topicName = "test-topic";
+        Map<String, String> topicConfigs = new HashMap<>();
+        topicConfigs.put("retention.ms", "1000");
+        int numberOfPartitions = 3;
+        Optional<Short> replicationFactor = Optional.of((short) 2);
+        ConfiguredInternalTopic configuredInternalTopic = new 
ConfiguredInternalTopic(
+            topicName, numberOfPartitions, replicationFactor, topicConfigs);
+
+        StreamsGroupDescribeResponseData.TopicInfo topicInfo = 
configuredInternalTopic.asStreamsGroupDescribeTopicInfo();
+
+        assertEquals(topicName, topicInfo.name());
+        assertEquals(numberOfPartitions, topicInfo.partitions());
+        assertEquals(replicationFactor.orElse((short) 0).shortValue(), 
topicInfo.replicationFactor());
+        assertEquals(1, topicInfo.topicConfigs().size());
+        assertEquals("1000", topicInfo.topicConfigs().get(0).value());
+    }
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
new file mode 100644
index 00000000000..d30716c25f7
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredSubtopologyTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfiguredSubtopologyTest {
+
+    @Test
+    public void testConstructorWithNullSourceTopics() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredSubtopology(
+                null,
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                Collections.emptyMap()
+            )
+        );
+    }
+
+    @Test
+    public void testConstructorWithNullRepartitionSourceTopics() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredSubtopology(
+                Collections.emptySet(),
+                null,
+                Collections.emptySet(),
+                Collections.emptyMap()
+            )
+        );
+    }
+
+    @Test
+    public void testConstructorWithNullRepartitionSinkTopics() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredSubtopology(
+                Collections.emptySet(),
+                Collections.emptyMap(),
+                null,
+                Collections.emptyMap()
+            )
+        );
+    }
+
+    @Test
+    public void testConstructorWithNullStateChangelogTopics() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredSubtopology(
+                Collections.emptySet(),
+                Collections.emptyMap(),
+                Collections.emptySet(),
+                null
+            )
+        );
+    }
+
+    @Test
+    public void testAsStreamsGroupDescribeSubtopology() {
+        String subtopologyId = "subtopology1";
+        Set<String> sourceTopics = new HashSet<>(Set.of("sourceTopic1", 
"sourceTopic2"));
+        Set<String> repartitionSinkTopics = new 
HashSet<>(Set.of("repartitionSinkTopic1", "repartitionSinkTopic2"));
+        ConfiguredInternalTopic internalTopicMock = 
mock(ConfiguredInternalTopic.class);
+        StreamsGroupDescribeResponseData.TopicInfo topicInfo = new 
StreamsGroupDescribeResponseData.TopicInfo();
+        
when(internalTopicMock.asStreamsGroupDescribeTopicInfo()).thenReturn(topicInfo);
+        Map<String, ConfiguredInternalTopic> repartitionSourceTopics = 
Map.of("repartitionSourceTopic1", internalTopicMock);
+        Map<String, ConfiguredInternalTopic> stateChangelogTopics = 
Map.of("stateChangelogTopic1", internalTopicMock);
+        ConfiguredSubtopology configuredSubtopology = new 
ConfiguredSubtopology(
+            sourceTopics, repartitionSourceTopics, repartitionSinkTopics, 
stateChangelogTopics);
+
+        StreamsGroupDescribeResponseData.Subtopology subtopology = 
configuredSubtopology.asStreamsGroupDescribeSubtopology(subtopologyId);
+
+        assertEquals(subtopologyId, subtopology.subtopologyId());
+        assertEquals(sourceTopics.stream().sorted().toList(), 
subtopology.sourceTopics());
+        assertEquals(repartitionSinkTopics.stream().sorted().toList(), 
subtopology.repartitionSinkTopics());
+        assertEquals(List.of(topicInfo), 
subtopology.repartitionSourceTopics());
+        assertEquals(List.of(topicInfo), subtopology.stateChangelogTopics());
+    }
+
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
new file mode 100644
index 00000000000..fc862a7a027
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ConfiguredTopologyTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopic;
+import org.apache.kafka.common.message.StreamsGroupDescribeResponseData;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfiguredTopologyTest {
+
+    @Test
+    public void testConstructorWithNullSubtopologies() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredTopology(
+                0,
+                null,
+                Collections.emptyMap(),
+                Optional.empty()
+            )
+        );
+    }
+
+    @Test
+    public void testConstructorWithNullInternalTopicsToBeCreated() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredTopology(
+                0,
+                Collections.emptyMap(),
+                null,
+                Optional.empty()
+            )
+        );
+    }
+
+    @Test
+    public void testConstructorWithNullTopicConfigurationException() {
+        assertThrows(NullPointerException.class,
+            () -> new ConfiguredTopology(
+                0,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                null
+            )
+        );
+    }
+
+    @Test
+    public void testConstructorWithInvalidTopologyEpoch() {
+        assertThrows(IllegalArgumentException.class,
+            () -> new ConfiguredTopology(
+                -1,
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Optional.empty()
+            )
+        );
+    }
+
+    @Test
+    public void testIsReady() {
+        ConfiguredTopology readyTopology = new ConfiguredTopology(
+            1, new HashMap<>(), new HashMap<>(), Optional.empty());
+        assertTrue(readyTopology.isReady());
+
+        ConfiguredTopology notReadyTopology = new ConfiguredTopology(
+            1, new HashMap<>(), new HashMap<>(), 
Optional.of(TopicConfigurationException.missingSourceTopics("missing")));
+        assertFalse(notReadyTopology.isReady());
+    }
+
+    @Test
+    public void testAsStreamsGroupDescribeTopology() {
+        int topologyEpoch = 1;
+        ConfiguredSubtopology subtopologyMock = 
mock(ConfiguredSubtopology.class);
+        StreamsGroupDescribeResponseData.Subtopology subtopologyResponse = new 
StreamsGroupDescribeResponseData.Subtopology();
+        
when(subtopologyMock.asStreamsGroupDescribeSubtopology(Mockito.anyString())).thenReturn(subtopologyResponse);
+        Map<String, ConfiguredSubtopology> subtopologies = new HashMap<>();
+        subtopologies.put("subtopology1", subtopologyMock);
+        Map<String, CreatableTopic> internalTopicsToBeCreated = new 
HashMap<>();
+        Optional<TopicConfigurationException> topicConfigurationException = 
Optional.empty();
+        ConfiguredTopology configuredTopology = new ConfiguredTopology(
+            topologyEpoch, subtopologies, internalTopicsToBeCreated, 
topicConfigurationException);
+
+        StreamsGroupDescribeResponseData.Topology topology = 
configuredTopology.asStreamsGroupDescribeTopology();
+
+        assertEquals(topologyEpoch, topology.epoch());
+        assertEquals(1, topology.subtopologies().size());
+        assertEquals(subtopologyResponse, topology.subtopologies().get(0));
+    }
+}
\ No newline at end of file
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java
new file mode 100644
index 00000000000..479cef5db13
--- /dev/null
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/TopicConfigurationExceptionTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams.topics;
+
+import org.apache.kafka.common.requests.StreamsGroupHeartbeatResponse.Status;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TopicConfigurationExceptionTest {
+
+    @Test
+    public void testMissingSourceTopics() {
+        TopicConfigurationException exception = 
TopicConfigurationException.missingSourceTopics("test");
+        assertEquals(Status.MISSING_SOURCE_TOPICS, exception.status());
+        assertEquals("test", exception.getMessage());
+    }
+
+    @Test
+    public void testMissingInternalTopics() {
+        TopicConfigurationException exception = 
TopicConfigurationException.missingInternalTopics("test");
+        assertEquals(Status.MISSING_INTERNAL_TOPICS, exception.status());
+        assertEquals("test", exception.getMessage());
+    }
+
+    @Test
+    public void testIncorrectlyPartitionedTopics() {
+        TopicConfigurationException exception = 
TopicConfigurationException.incorrectlyPartitionedTopics("test");
+        assertEquals(Status.INCORRECTLY_PARTITIONED_TOPICS, 
exception.status());
+        assertEquals("test", exception.getMessage());
+    }
+
+}

Reply via email to