This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 529dde9  KAFKA-12648: handle MissingSourceTopicException for named 
topologies (#11600)
529dde9 is described below

commit 529dde904a892ba2f95e6150066758e7476f15e3
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Jan 18 11:49:23 2022 -0800

    KAFKA-12648: handle MissingSourceTopicException for named topologies 
(#11600)
    
    Avoid throwing a MissingSourceTopicException inside the #assign method when 
named topologies are used, and just remove those topologies which are missing 
any of their input topics from the assignment.
    
    Reviewers: Guozhang Wang <[email protected]>, Walker Carlson 
<[email protected]>, Bruno Cadonna <[email protected]>
---
 .../processor/internals/RepartitionTopics.java     | 131 +++++++++++++--------
 .../streams/processor/internals/StreamThread.java  |   7 +-
 .../internals/StreamsPartitionAssignor.java        |  37 ++++--
 .../processor/internals/TopologyMetadata.java      |  30 ++++-
 .../integration/NamedTopologyIntegrationTest.java  |  74 +++++++++---
 .../processor/internals/RepartitionTopicsTest.java |  30 +++--
 6 files changed, 218 insertions(+), 91 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
index 801e6c1..2951451 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java
@@ -21,10 +21,8 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.streams.errors.MissingSourceTopicException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
-import 
org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
 import 
org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
 import org.slf4j.Logger;
 
@@ -45,6 +43,7 @@ public class RepartitionTopics {
     private final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
     private final Logger log;
     private final Map<TopicPartition, PartitionInfo> topicPartitionInfos = new 
HashMap<>();
+    private final Map<String, Set<String>> missingUserInputTopicsPerTopology = 
new HashMap<>();
 
     public RepartitionTopics(final TopologyMetadata topologyMetadata,
                              final InternalTopicManager internalTopicManager,
@@ -59,50 +58,93 @@ public class RepartitionTopics {
         log = logContext.logger(getClass());
     }
 
-    public void setup() {
-        final Map<Subtopology, TopicsInfo> topicGroups = 
topologyMetadata.topicGroups();
-        final Map<String, InternalTopicConfig> repartitionTopicMetadata = 
computeRepartitionTopicConfig(topicGroups, clusterMetadata);
-
-        // ensure the co-partitioning topics within the group have the same 
number of partitions,
-        // and enforce the number of partitions for those repartition topics 
to be the same if they
-        // are co-partitioned as well.
-        ensureCopartitioning(topologyMetadata.copartitionGroups(), 
repartitionTopicMetadata, clusterMetadata);
-
-        // make sure the repartition source topics exist with the right number 
of partitions,
-        // create these topics if necessary
-        internalTopicManager.makeReady(repartitionTopicMetadata);
-
-        // augment the metadata with the newly computed number of partitions 
for all the
-        // repartition source topics
-        for (final Map.Entry<String, InternalTopicConfig> entry : 
repartitionTopicMetadata.entrySet()) {
-            final String topic = entry.getKey();
-            final int numPartitions = 
entry.getValue().numberOfPartitions().orElse(-1);
-
-            for (int partition = 0; partition < numPartitions; partition++) {
-                topicPartitionInfos.put(
-                    new TopicPartition(topic, partition),
-                    new PartitionInfo(topic, partition, null, new Node[0], new 
Node[0])
-                );
+    /**
+     * @return   true iff setup was completed successfully and all user input 
topics were verified to exist
+     */
+    public boolean setup() {
+        final Map<String, Collection<TopicsInfo>> topicGroups = 
topologyMetadata.topicGroupsByTopology();
+        final Map<String, InternalTopicConfig> repartitionTopicMetadata
+            = computeRepartitionTopicConfig(topicGroups, clusterMetadata);
+
+        if (repartitionTopicMetadata.isEmpty()) {
+            if (missingUserInputTopicsPerTopology.isEmpty()) {
+                log.info("Skipping the repartition topic validation since 
there are no repartition topics.");
+            } else {
+                log.info("Skipping the repartition topic validation since all 
topologies containing repartition"
+                             + "topics are missing external user source topics 
and cannot be processed.");
+            }
+        } else {
+            // ensure the co-partitioning topics within the group have the 
same number of partitions,
+            // and enforce the number of partitions for those repartition 
topics to be the same if they
+            // are co-partitioned as well.
+            ensureCopartitioning(topologyMetadata.copartitionGroups(), 
repartitionTopicMetadata, clusterMetadata);
+
+            // make sure the repartition source topics exist with the right 
number of partitions,
+            // create these topics if necessary
+            internalTopicManager.makeReady(repartitionTopicMetadata);
+
+            // augment the metadata with the newly computed number of 
partitions for all the
+            // repartition source topics
+            for (final Map.Entry<String, InternalTopicConfig> entry : 
repartitionTopicMetadata.entrySet()) {
+                final String topic = entry.getKey();
+                final int numPartitions = 
entry.getValue().numberOfPartitions().orElse(-1);
+
+                for (int partition = 0; partition < numPartitions; 
partition++) {
+                    topicPartitionInfos.put(
+                        new TopicPartition(topic, partition),
+                        new PartitionInfo(topic, partition, null, new Node[0], 
new Node[0])
+                    );
+                }
             }
         }
+
+        return missingUserInputTopicsPerTopology.isEmpty();
+    }
+
+    public Map<String, Set<String>> missingUserInputTopicsPerTopology() {
+        return Collections.unmodifiableMap(missingUserInputTopicsPerTopology);
     }
 
     public Map<TopicPartition, PartitionInfo> topicPartitionsInfo() {
         return Collections.unmodifiableMap(topicPartitionInfos);
     }
 
-    private Map<String, InternalTopicConfig> 
computeRepartitionTopicConfig(final Map<Subtopology, TopicsInfo> topicGroups,
+    /**
+     * @param topicGroups                            information about the 
topic groups (subtopologies) in this application
+     * @param clusterMetadata                        cluster metadata, eg 
which topics exist on the brokers
+     */
+    private Map<String, InternalTopicConfig> 
computeRepartitionTopicConfig(final Map<String, Collection<TopicsInfo>> 
topicGroups,
                                                                            
final Cluster clusterMetadata) {
-
-        final Map<String, InternalTopicConfig> repartitionTopicConfigs = new 
HashMap<>();
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
-            checkIfExternalSourceTopicsExist(topicsInfo, clusterMetadata);
-            
repartitionTopicConfigs.putAll(topicsInfo.repartitionSourceTopics.values().stream()
-                .collect(Collectors.toMap(InternalTopicConfig::name, 
topicConfig -> topicConfig)));
+        final Set<TopicsInfo> allTopicsInfo = new HashSet<>();
+        final Map<String, InternalTopicConfig> allRepartitionTopicConfigs = 
new HashMap<>();
+        for (final Map.Entry<String, Collection<TopicsInfo>> topology : 
topicGroups.entrySet()) {
+            final String topologyName = topology.getKey();
+            final Set<String> missingSourceTopics = new HashSet<>();
+            final Map<String, InternalTopicConfig> 
repartitionTopicConfigsPerTopology = new HashMap<>();
+            for (final TopicsInfo topicsInfo : topology.getValue()) {
+                
missingSourceTopics.addAll(computeMissingExternalSourceTopics(topicsInfo, 
clusterMetadata));
+                repartitionTopicConfigsPerTopology.putAll(
+                    topicsInfo.repartitionSourceTopics
+                        .values()
+                        .stream()
+                        .collect(Collectors.toMap(InternalTopicConfig::name, 
topicConfig -> topicConfig)));
+            }
+            if (missingSourceTopics.isEmpty()) {
+                
allRepartitionTopicConfigs.putAll(repartitionTopicConfigsPerTopology);
+                allTopicsInfo.addAll(topology.getValue());
+            } else {
+                missingUserInputTopicsPerTopology.put(topologyName, 
missingSourceTopics);
+                log.error("Topology {} was missing source topics {} and will 
be excluded from the current assignment, "
+                              + "this can be due to the consumer client's 
metadata being stale or because they have "
+                              + "not been created yet. Please verify that you 
have created all input topics; if they "
+                              + "do exist, you just need to wait for the 
metadata to be updated, at which time a new "
+                              + "rebalance will be kicked off automatically 
and the topology will be retried at that time."
+                              + topologyName, missingSourceTopics);
+            }
         }
-        setRepartitionSourceTopicPartitionCount(repartitionTopicConfigs, 
topicGroups, clusterMetadata);
+        setRepartitionSourceTopicPartitionCount(allRepartitionTopicConfigs, 
allTopicsInfo, clusterMetadata);
 
-        return repartitionTopicConfigs;
+        return allRepartitionTopicConfigs;
     }
 
     private void ensureCopartitioning(final Collection<Set<String>> 
copartitionGroups,
@@ -113,31 +155,26 @@ public class RepartitionTopics {
         }
     }
 
-    private void checkIfExternalSourceTopicsExist(final TopicsInfo topicsInfo,
-                                                  final Cluster 
clusterMetadata) {
+    private Set<String> computeMissingExternalSourceTopics(final TopicsInfo 
topicsInfo,
+                                                           final Cluster 
clusterMetadata) {
         final Set<String> missingExternalSourceTopics = new 
HashSet<>(topicsInfo.sourceTopics);
         
missingExternalSourceTopics.removeAll(topicsInfo.repartitionSourceTopics.keySet());
         missingExternalSourceTopics.removeAll(clusterMetadata.topics());
-        if (!missingExternalSourceTopics.isEmpty()) {
-            log.error("The following source topics are missing/unknown: {}. 
Please make sure all source topics " +
-                    "have been pre-created before starting the Streams 
application. ",
-                missingExternalSourceTopics);
-            throw new MissingSourceTopicException("Missing source topics.");
-        }
+        return missingExternalSourceTopics;
     }
 
     /**
      * Computes the number of partitions and sets it for each repartition 
topic in repartitionTopicMetadata
      */
     private void setRepartitionSourceTopicPartitionCount(final Map<String, 
InternalTopicConfig> repartitionTopicMetadata,
-                                                         final 
Map<Subtopology, TopicsInfo> topicGroups,
+                                                         final 
Collection<TopicsInfo> topicGroups,
                                                          final Cluster 
clusterMetadata) {
         boolean partitionCountNeeded;
         do {
             partitionCountNeeded = false;
             boolean progressMadeThisIteration = false;  // avoid infinitely 
looping without making any progress on unknown repartitions
 
-            for (final TopicsInfo topicsInfo : topicGroups.values()) {
+            for (final TopicsInfo topicsInfo : topicGroups) {
                 for (final String repartitionSourceTopic : 
topicsInfo.repartitionSourceTopics.keySet()) {
                     final Optional<Integer> 
repartitionSourceTopicPartitionCount =
                         
repartitionTopicMetadata.get(repartitionSourceTopic).numberOfPartitions();
@@ -173,12 +210,12 @@ public class RepartitionTopics {
     }
 
     private Integer computePartitionCount(final Map<String, 
InternalTopicConfig> repartitionTopicMetadata,
-                                          final Map<Subtopology, TopicsInfo> 
topicGroups,
+                                          final Collection<TopicsInfo> 
topicGroups,
                                           final Cluster clusterMetadata,
                                           final String repartitionSourceTopic) 
{
         Integer partitionCount = null;
         // try set the number of partitions for this repartition topic if it 
is not set yet
-        for (final TopicsInfo topicsInfo : topicGroups.values()) {
+        for (final TopicsInfo topicsInfo : topicGroups) {
             final Set<String> sinkTopics = topicsInfo.sinkTopics;
 
             if (sinkTopics.contains(repartitionSourceTopic)) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ef425ec..509fd44 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -917,8 +917,9 @@ public class StreamThread extends Thread {
 
             topologyMetadata.maybeWaitForNonEmptyTopology(() -> state);
 
+            // We don't need to manually trigger a rebalance to pick up tasks 
from the new topology, as
+            // a rebalance will always occur when the metadata is updated 
after a change in subscription
             subscribeConsumer();
-            mainConsumer.enforceRebalance();
         }
     }
 
@@ -1137,6 +1138,8 @@ public class StreamThread extends Thread {
 
         log.info("Shutting down");
 
+        topologyMetadata.unregisterThread(threadMetadata.threadName());
+
         try {
             taskManager.shutdown(cleanRun);
         } catch (final Throwable e) {
@@ -1165,8 +1168,6 @@ public class StreamThread extends Thread {
 
         setState(State.DEAD);
 
-        topologyMetadata.unregisterThread(threadMetadata.threadName());
-
         log.info("Shutdown complete");
     }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 083253c..4c8b146 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -317,7 +317,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         int minSupportedMetadataVersion = LATEST_SUPPORTED_VERSION;
 
         boolean shutdownRequested = false;
-        boolean assignementErrorFound = false;
+        boolean assignmentErrorFound = false;
         int futureMetadataVersion = UNKNOWN;
         for (final Map.Entry<String, Subscription> entry : 
subscriptions.entrySet()) {
             final String consumerId = entry.getKey();
@@ -355,12 +355,12 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             final int prevSize = allOwnedPartitions.size();
             allOwnedPartitions.addAll(subscription.ownedPartitions());
             if (allOwnedPartitions.size() < prevSize + 
subscription.ownedPartitions().size()) {
-                assignementErrorFound = true;
+                assignmentErrorFound = true;
             }
             clientMetadata.addPreviousTasksAndOffsetSums(consumerId, 
info.taskOffsetSums());
         }
 
-        if (assignementErrorFound) {
+        if (assignmentErrorFound) {
             log.warn("The previous assignment contains a partition more than 
once. " +
                 "\t Mapping: {}", subscriptions);
         }
@@ -380,7 +380,10 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             // parse the topology to determine the repartition source topics,
             // making sure they are created with the number of partitions as
             // the maximum of the depending sub-topologies source topics' 
number of partitions
-            final Map<TopicPartition, PartitionInfo> 
allRepartitionTopicPartitions = prepareRepartitionTopics(metadata);
+            final RepartitionTopics repartitionTopics = 
prepareRepartitionTopics(metadata);
+            final Map<TopicPartition, PartitionInfo> 
allRepartitionTopicPartitions = repartitionTopics.topicPartitionsInfo();
+            final Map<String, Set<String>> missingUserInputTopicsPerTopology = 
repartitionTopics.missingUserInputTopicsPerTopology();
+
             final Cluster fullMetadata = 
metadata.withPartitions(allRepartitionTopicPartitions);
             log.debug("Created repartition topics {} from the parsed 
topology.", allRepartitionTopicPartitions.values());
 
@@ -388,7 +391,8 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
 
             // construct the assignment of tasks to clients
 
-            final Map<Subtopology, TopicsInfo> topicGroups = 
taskManager.topologyMetadata().topicGroups();
+            final Map<Subtopology, TopicsInfo> topicGroups =
+                
taskManager.topologyMetadata().topicGroups(missingUserInputTopicsPerTopology.keySet());
 
             final Set<String> allSourceTopics = new HashSet<>();
             final Map<Subtopology, Set<String>> sourceTopicsByGroup = new 
HashMap<>();
@@ -488,12 +492,18 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
     }
 
     /**
-     * Computes and assembles all repartition topic metadata then creates the 
topics if necessary.
+     * Computes and assembles all repartition topic metadata then creates the 
topics if necessary. Also verifies
+     * that all user input topics of each topology have been created ahead of 
time. If any such source topics are
+     * missing from a NamedTopology, the assignor will skip distributing its 
tasks until they have been created
+     * and invoke the exception handler (without killing the thread) once for 
each topology to alert the user of
+     * the missing topics.
+     * <p>
+     * For regular applications without named topologies, the assignor will 
instead send a shutdown signal to
+     * all clients so the user can identify and resolve the problem.
      *
-     * @return map from repartition topic to its partition info
+     * @return application metadata such as partition info of repartition 
topics, missing external topics, etc
      */
-    private Map<TopicPartition, PartitionInfo> prepareRepartitionTopics(final 
Cluster metadata) {
-
+    private RepartitionTopics prepareRepartitionTopics(final Cluster metadata) 
{
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             taskManager.topologyMetadata(),
             internalTopicManager,
@@ -501,8 +511,13 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
             metadata,
             logPrefix
         );
-        repartitionTopics.setup();
-        return repartitionTopics.topicPartitionsInfo();
+        final boolean isMissingInputTopics = !repartitionTopics.setup();
+        if (isMissingInputTopics) {
+            if (!taskManager.topologyMetadata().hasNamedTopologies()) {
+                throw new MissingSourceTopicException("Missing source 
topics.");
+            }
+        }
+        return repartitionTopics;
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
index e855962..fb161ca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TopologyMetadata.java
@@ -59,7 +59,7 @@ public class TopologyMetadata {
 
     // the "__" (double underscore) string is not allowed for topology names, 
so it's safe to use to indicate
     // that it's not a named topology
-    private static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
+    public static final String UNNAMED_TOPOLOGY = "__UNNAMED_TOPOLOGY__";
     private static final Pattern EMPTY_ZERO_LENGTH_PATTERN = 
Pattern.compile("");
 
     private final StreamsConfig config;
@@ -436,9 +436,33 @@ public class TopologyMetadata {
         return sourceTopics;
     }
 
-    public Map<Subtopology, TopicsInfo> topicGroups() {
+    private String getTopologyNameOrElseUnnamed(final String topologyName) {
+        return topologyName == null ? UNNAMED_TOPOLOGY : topologyName;
+    }
+
+    /**
+     * @param topologiesToExclude the names of any topologies to exclude from 
the returned topic groups,
+     *                            eg because they have missing source topics 
and can't be processed yet
+     */
+    public Map<Subtopology, TopicsInfo> topicGroups(final Set<String> 
topologiesToExclude) {
         final Map<Subtopology, TopicsInfo> topicGroups = new HashMap<>();
-        applyToEachBuilder(b -> topicGroups.putAll(b.topicGroups()));
+        for (final InternalTopologyBuilder builder : builders.values()) {
+            if (!topologiesToExclude.contains(builder.topologyName())) {
+                topicGroups.putAll(builder.topicGroups());
+            }
+        }
+        return topicGroups;
+    }
+
+    /**
+     * @return    map from topologies with missing external source topics to 
the set of missing topic names,
+     *            keyed by topology name or
+     */
+    public Map<String, Collection<TopicsInfo>> topicGroupsByTopology() {
+        final Map<String, Collection<TopicsInfo>> topicGroups = new 
HashMap<>();
+        applyToEachBuilder(
+            b -> 
topicGroups.put(getTopologyNameOrElseUnnamed(b.topologyName()), 
b.topicGroups().values())
+        );
         return topicGroups;
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
index 1517736..e58ec96 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java
@@ -52,7 +52,6 @@ import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
@@ -84,11 +83,6 @@ public class NamedTopologyIntegrationTest {
     private static final String TOPOLOGY_2 = "topology-2";
     private static final String TOPOLOGY_3 = "topology-3";
 
-    // TODO KAFKA-12648:
-    //  1) full test coverage for add/removeNamedTopology, covering:
-    //      - the "last topology removed" case
-    //      - test using multiple clients, with standbys
-
     // "standard" input topics which are pre-filled with the 
STANDARD_INPUT_DATA
     private final static String INPUT_STREAM_1 = "input-stream-1";
     private final static String INPUT_STREAM_2 = "input-stream-2";
@@ -107,6 +101,12 @@ public class NamedTopologyIntegrationTest {
     private final static String DELAYED_INPUT_STREAM_3 = 
"delayed-input-stream-3";
     private final static String DELAYED_INPUT_STREAM_4 = 
"delayed-input-stream-4";
 
+    // topic that is not initially created during the test setup
+    private final static String NEW_STREAM = "new-stream";
+
+    // existing topic that is pre-filled but cleared between tests
+    private final static String EXISTING_STREAM = "existing-stream";
+
     private final static Materialized<Object, Long, KeyValueStore<Bytes, 
byte[]>> IN_MEMORY_STORE = 
Materialized.as(Stores.inMemoryKeyValueStore("store"));
     private final static Materialized<Object, Long, KeyValueStore<Bytes, 
byte[]>> ROCKSDB_STORE = 
Materialized.as(Stores.persistentKeyValueStore("store"));
 
@@ -375,7 +375,6 @@ public class NamedTopologyIntegrationTest {
     }
 
     @Test
-    @Ignore
     public void shouldAddNamedTopologyToRunningApplicationWithMultipleNodes() 
throws Exception {
         setupSecondKafkaStreams();
         topology1Builder.stream(INPUT_STREAM_1).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
@@ -388,18 +387,14 @@ public class NamedTopologyIntegrationTest {
         streams2.start(topology1Builder2.build());
         waitForApplicationState(asList(streams, streams2), State.RUNNING, 
Duration.ofSeconds(30));
 
+        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
         final AddNamedTopologyResult result = 
streams.addNamedTopology(topology2Builder.build());
         final AddNamedTopologyResult result2 = 
streams2.addNamedTopology(topology2Builder2.build());
         result.all().get();
         result2.all().get();
 
-        assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
         assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
-
-        // TODO KAFKA-12648: need to make sure that both instances actually 
did some of this processing of topology-2,
-        //  ie that both joined the group after the new topology was added and 
then successfully processed records from it
-        //  Also: test where we wait for a rebalance between 
streams.addNamedTopology and streams2.addNamedTopology,
-        //  and vice versa, to make sure we hit case where not all new tasks 
are initially assigned, and when not all yet known
     }
 
     @Test
@@ -552,7 +547,6 @@ public class NamedTopologyIntegrationTest {
     }
 
     @Test
-    @Ignore
     public void 
shouldAddToEmptyInitialTopologyRemoveResetOffsetsThenAddSameNamedTopologyWithRepartitioning()
 throws Exception {
         CLUSTER.createTopics(SUM_OUTPUT, COUNT_OUTPUT);
         // Build up named topology with two stateful subtopologies
@@ -589,6 +583,58 @@ public class NamedTopologyIntegrationTest {
         CLUSTER.deleteTopicsAndWait(SUM_OUTPUT, COUNT_OUTPUT);
     }
 
+    @Test
+    public void 
shouldContinueProcessingOtherTopologiesWhenNewTopologyHasMissingInputTopics() 
throws Exception {
+        try {
+            CLUSTER.createTopic(EXISTING_STREAM, 2, 1);
+            produceToInputTopics(EXISTING_STREAM, STANDARD_INPUT_DATA);
+            setupSecondKafkaStreams();
+            topology1Builder.stream(EXISTING_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+            topology1Builder2.stream(EXISTING_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+            streams.start(topology1Builder.build());
+            streams2.start(topology1Builder2.build());
+            waitForApplicationState(asList(streams, streams2), State.RUNNING, 
Duration.ofSeconds(30));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+
+            topology2Builder.stream(NEW_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+            topology2Builder2.stream(NEW_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_2);
+
+            streams.addNamedTopology(topology2Builder.build());
+            streams2.addNamedTopology(topology2Builder2.build());
+
+            // make sure the original topology can continue processing while 
waiting on the new source topics
+            produceToInputTopics(EXISTING_STREAM, singletonList(pair("A", 
30L)));
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 1), equalTo(singletonList(pair("A", 3L))));
+
+            CLUSTER.createTopic(NEW_STREAM, 2, 1);
+            produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_2, 3), equalTo(COUNT_OUTPUT_DATA));
+        } finally {
+            CLUSTER.deleteTopicsAndWait(EXISTING_STREAM, NEW_STREAM);
+        }
+    }
+
+    @Test
+    public void shouldWaitForMissingInputTopicsToBeCreated() throws Exception {
+        setupSecondKafkaStreams();
+        topology1Builder.stream(NEW_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+        topology1Builder2.stream(NEW_STREAM).groupBy((k, v) -> 
k).count(IN_MEMORY_STORE).toStream().to(OUTPUT_STREAM_1);
+
+        streams.start(topology1Builder.build());
+        streams2.start(topology1Builder2.build());
+        waitForApplicationState(asList(streams, streams2), State.RUNNING, 
Duration.ofSeconds(30));
+
+        try {
+            CLUSTER.createTopic(NEW_STREAM, 2, 1);
+            produceToInputTopics(NEW_STREAM, STANDARD_INPUT_DATA);
+
+            assertThat(waitUntilMinKeyValueRecordsReceived(consumerConfig, 
OUTPUT_STREAM_1, 3), equalTo(COUNT_OUTPUT_DATA));
+        } finally {
+            CLUSTER.deleteTopicsAndWait(NEW_STREAM);
+        }
+    }
+
     private static void produceToInputTopics(final String topic, final 
Collection<KeyValue<String, Long>> records) {
         IntegrationTestUtils.produceKeyValuesSynchronously(
             topic,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
index ce94294..57bf60e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RepartitionTopicsTest.java
@@ -21,12 +21,12 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.MissingSourceTopicException;
 import org.apache.kafka.streams.errors.TaskAssignmentException;
 import 
org.apache.kafka.streams.processor.internals.InternalTopologyBuilder.TopicsInfo;
 import 
org.apache.kafka.streams.processor.internals.assignment.CopartitionedTopicsEnforcer;
 import 
org.apache.kafka.streams.processor.internals.testutil.DummyStreamsConfig;
 
+import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -41,6 +41,7 @@ import java.util.Set;
 import static org.apache.kafka.common.utils.Utils.mkEntry;
 import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
+import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_0;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.SUBTOPOLOGY_1;
 
@@ -51,6 +52,7 @@ import static org.easymock.EasyMock.mock;
 import static org.easymock.EasyMock.niceMock;
 import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
+import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.nullValue;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -100,9 +102,14 @@ public class RepartitionTopicsTest {
     final CopartitionedTopicsEnforcer copartitionedTopicsEnforcer = 
mock(CopartitionedTopicsEnforcer.class);
     final Cluster clusterMetadata = niceMock(Cluster.class);
 
+    @Before
+    public void setup() {
+        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
+        expect(internalTopologyBuilder.topologyName()).andStubReturn(null);
+    }
+
     @Test
     public void shouldSetupRepartitionTopics() {
-        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
         expect(internalTopologyBuilder.topicGroups())
             .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), 
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
         final Set<String> coPartitionGroup1 = mkSet(SOURCE_TOPIC_NAME1, 
SOURCE_TOPIC_NAME2);
@@ -141,8 +148,8 @@ public class RepartitionTopicsTest {
     }
 
     @Test
-    public void shouldThrowMissingSourceTopicException() {
-        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
+    public void shouldReturnMissingSourceTopics() {
+        final Set<String> missingSourceTopics = mkSet(SOURCE_TOPIC_NAME1);
         expect(internalTopologyBuilder.topicGroups())
             .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1), 
mkEntry(SUBTOPOLOGY_1, TOPICS_INFO2)));
         
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptyList());
@@ -152,7 +159,7 @@ public class RepartitionTopicsTest {
                 mkEntry(REPARTITION_TOPIC_NAME1, REPARTITION_TOPIC_CONFIG1)
             ))
         ).andReturn(Collections.emptySet());
-        setupClusterWithMissingTopics(mkSet(SOURCE_TOPIC_NAME1));
+        setupClusterWithMissingTopics(missingSourceTopics);
         replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(
             new TopologyMetadata(internalTopologyBuilder, config),
@@ -162,14 +169,17 @@ public class RepartitionTopicsTest {
             "[test] "
         );
 
-        assertThrows(MissingSourceTopicException.class, 
repartitionTopics::setup);
+        assertThat(repartitionTopics.setup(), equalTo(false));
+        assertThat(
+            repartitionTopics.missingUserInputTopicsPerTopology(),
+            equalTo(Collections.singletonMap(UNNAMED_TOPOLOGY, 
missingSourceTopics))
+        );
     }
 
     @Test
     public void 
shouldThrowTaskAssignmentExceptionIfPartitionCountCannotBeComputedForAllRepartitionTopics()
 {
         final RepartitionTopicConfig 
repartitionTopicConfigWithoutPartitionCount =
             new RepartitionTopicConfig(REPARTITION_WITHOUT_PARTITION_COUNT, 
TOPIC_CONFIG5);
-        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
         expect(internalTopologyBuilder.topicGroups())
             .andReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, TOPICS_INFO1),
@@ -208,7 +218,6 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
         expect(internalTopologyBuilder.topicGroups())
             .andReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
@@ -252,7 +261,6 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
         expect(internalTopologyBuilder.topicGroups())
             .andReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
@@ -307,7 +315,6 @@ public class RepartitionTopicsTest {
             ),
             Collections.emptyMap()
         );
-        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
         expect(internalTopologyBuilder.topicGroups())
             .andReturn(mkMap(
                 mkEntry(SUBTOPOLOGY_0, topicsInfo),
@@ -357,11 +364,8 @@ public class RepartitionTopicsTest {
             Collections.emptyMap(),
             Collections.emptyMap()
         );
-        
expect(internalTopologyBuilder.hasNamedTopology()).andStubReturn(false);
         expect(internalTopologyBuilder.topicGroups())
             .andReturn(mkMap(mkEntry(SUBTOPOLOGY_0, topicsInfo)));
-        
expect(internalTopologyBuilder.copartitionGroups()).andReturn(Collections.emptySet());
-        
expect(internalTopicManager.makeReady(Collections.emptyMap())).andReturn(Collections.emptySet());
         setupCluster();
         replay(internalTopicManager, internalTopologyBuilder, clusterMetadata);
         final RepartitionTopics repartitionTopics = new RepartitionTopics(

Reply via email to