Repository: kafka Updated Branches: refs/heads/trunk 3e0333d69 -> 84c8d2bb8
KAFKA-2872: unite sink nodes with parent nodes in addSink Starting a KafkaStream was getting an error due to the fact that the TopologyBuilder.addSink method was not connecting the sink with it parent(s) processor/sources. Just needed to wire up the sink with it parent(s) in TopologyBuilder.addSink . Author: bbejeck <[email protected]> Reviewers: Guozhang Wang Closes #572 from bbejeck/KAFKA-2872_kafka_stream_sink_not_connected_to_parent Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/84c8d2bb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/84c8d2bb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/84c8d2bb Branch: refs/heads/trunk Commit: 84c8d2bb86dc2794a3d6a86ae28b3cb51cea5c4b Parents: 3e0333d Author: Bill Bejeck <[email protected]> Authored: Sat Nov 21 18:46:15 2015 -0800 Committer: [email protected] <[email protected]> Committed: Sat Nov 21 18:46:15 2015 -0800 ---------------------------------------------------------------------- .../streams/processor/TopologyBuilder.java | 2 + .../streams/processor/TopologyBuilderTest.java | 42 +++++++++++++++++--- 2 files changed, 39 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/84c8d2bb/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 893f7de..021a47f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -230,6 +230,8 @@ public class TopologyBuilder { } nodeFactories.put(name, new SinkNodeFactory(name, parentNames, topic, keySerializer, valSerializer)); + nodeGrouper.add(name); + nodeGrouper.unite(name, parentNames); return this; } http://git-wip-us.apache.org/repos/asf/kafka/blob/84c8d2bb/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index b1b71b6..f6924ad 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -17,11 +17,6 @@ package org.apache.kafka.streams.processor; -import static org.junit.Assert.assertEquals; - -import static org.apache.kafka.common.utils.Utils.mkSet; -import static org.apache.kafka.common.utils.Utils.mkList; - import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.test.MockProcessorSupplier; @@ -35,6 +30,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import static org.apache.kafka.common.utils.Utils.mkList; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TopologyBuilderTest { @Test(expected = TopologyException.class) @@ -100,6 +100,38 @@ public class TopologyBuilderTest { } @Test + public void testAddSinkConnectedWithParent() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source", "source-topic"); + builder.addSink("sink", "dest-topic", "source"); + + Map<Integer, Set<String>> nodeGroups = builder.nodeGroups(); + Set<String> nodeGroup = nodeGroups.get(0); + + assertTrue(nodeGroup.contains("sink")); + assertTrue(nodeGroup.contains("source")); + + } + + @Test + public void testAddSinkConnectedWithMultipleParent() { + final TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source", "source-topic"); + builder.addSource("sourceII", "source-topicII"); + builder.addSink("sink", "dest-topic", "source", "sourceII"); + + Map<Integer, Set<String>> nodeGroups = builder.nodeGroups(); + Set<String> nodeGroup = nodeGroups.get(0); + + assertTrue(nodeGroup.contains("sink")); + assertTrue(nodeGroup.contains("source")); + assertTrue(nodeGroup.contains("sourceII")); + + } + + @Test public void testSourceTopics() { final TopologyBuilder builder = new TopologyBuilder();
