Repository: kafka
Updated Branches:
  refs/heads/trunk da70316a5 -> 35cd008e5


KAFKA-4645: Improve test coverage of ProcessorTopology

the toString method prints the topology, but had no tests making sure it works 
and/or doesn't cause exceptions

Author: Damian Guy <damian....@gmail.com>

Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang

Closes #2444 from dguy/KAFKA-4645


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/35cd008e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/35cd008e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/35cd008e

Branch: refs/heads/trunk
Commit: 35cd008e5a1ec89d09f956a6f9952ad834e556d7
Parents: da70316
Author: Damian Guy <damian....@gmail.com>
Authored: Wed Feb 1 20:30:04 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Wed Feb 1 20:30:04 2017 -0800

----------------------------------------------------------------------
 .../internals/ProcessorTopologyTest.java        | 88 +++++++++++++++-----
 1 file changed, 67 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/35cd008e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index 71c234e..f35a2b5 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -46,6 +46,8 @@ import org.junit.Test;
 import java.io.File;
 import java.util.Properties;
 
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
@@ -55,12 +57,14 @@ public class ProcessorTopologyTest {
     private static final Serializer<String> STRING_SERIALIZER = new 
StringSerializer();
     private static final Deserializer<String> STRING_DESERIALIZER = new 
StringDeserializer();
 
-    protected static final String INPUT_TOPIC_1 = "input-topic-1";
-    protected static final String INPUT_TOPIC_2 = "input-topic-2";
-    protected static final String OUTPUT_TOPIC_1 = "output-topic-1";
-    protected static final String OUTPUT_TOPIC_2 = "output-topic-2";
+    private static final String INPUT_TOPIC_1 = "input-topic-1";
+    private static final String INPUT_TOPIC_2 = "input-topic-2";
+    private static final String OUTPUT_TOPIC_1 = "output-topic-1";
+    private static final String OUTPUT_TOPIC_2 = "output-topic-2";
 
     private static long timestamp = 1000L;
+    private final TopologyBuilder builder = new TopologyBuilder();
+    private final MockProcessorSupplier mockProcessorSupplier = new 
MockProcessorSupplier();
 
     private ProcessorTopologyTestDriver driver;
     private StreamsConfig config;
@@ -89,7 +93,7 @@ public class ProcessorTopologyTest {
 
     @Test
     public void testTopologyMetadata() {
-        final TopologyBuilder builder = new 
TopologyBuilder().setApplicationId("X");
+        builder.setApplicationId("X");
 
         builder.addSource("source-1", "topic-1");
         builder.addSource("source-2", "topic-2", "topic-3");
@@ -206,7 +210,7 @@ public class ProcessorTopologyTest {
         final String global = "global";
         final String topic = "topic";
         final KeyValueStore<String, String> globalStore = 
(KeyValueStore<String, String>) storeSupplier.get();
-        final TopologyBuilder topologyBuilder = new TopologyBuilder()
+        final TopologyBuilder topologyBuilder = this.builder
                 .addGlobalStore(globalStore, global, STRING_DESERIALIZER, 
STRING_DESERIALIZER, topic, "processor", define(new 
StatefulProcessor("my-store")));
 
         driver = new ProcessorTopologyTestDriver(config, topologyBuilder, 
"my-store");
@@ -230,9 +234,51 @@ public class ProcessorTopologyTest {
         assertNoOutputRecord(OUTPUT_TOPIC_1);
     }
 
+    @Test
+    public void shouldCreateStringWithSourceAndTopics() throws Exception {
+        builder.addSource("source", "topic1", "topic2");
+        final ProcessorTopology topology = builder.build(null);
+        final String result = topology.toString();
+        assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, 
topic2]\n"));
+    }
+
+    @Test
+    public void shouldCreateStringWithMultipleSourcesAndTopics() throws 
Exception {
+        builder.addSource("source", "topic1", "topic2");
+        builder.addSource("source2", "t", "t1", "t2");
+        final ProcessorTopology topology = builder.build(null);
+        final String result = topology.toString();
+        assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, 
topic2]\n"));
+        assertThat(result, containsString("source2:\n\t\ttopics:\t\t[t, t1, 
t2]\n"));
+    }
 
+    @Test
+    public void shouldCreateStringWithProcessors() throws Exception {
+        builder.addSource("source", "t")
+                .addProcessor("processor", mockProcessorSupplier, "source")
+                .addProcessor("other", mockProcessorSupplier, "source");
+        final ProcessorTopology topology = builder.build(null);
+        final String result = topology.toString();
+        assertThat(result, containsString("\t\tchildren:\t[processor, 
other]"));
+        assertThat(result, containsString("processor:\n"));
+        assertThat(result, containsString("other:\n"));
+    }
+
+    @Test
+    public void shouldRecursivelyPrintChildren() throws Exception {
+        builder.addSource("source", "t")
+                .addProcessor("processor", mockProcessorSupplier, "source")
+                .addProcessor("child-one", mockProcessorSupplier, "processor")
+                .addProcessor("child-one-one", mockProcessorSupplier, 
"child-one")
+                .addProcessor("child-two", mockProcessorSupplier, "processor")
+                .addProcessor("child-two-one", mockProcessorSupplier, 
"child-two");
+
+        final String result = builder.build(null).toString();
+        assertThat(result, 
containsString("child-one:\n\t\tchildren:\t[child-one-one]"));
+        assertThat(result, 
containsString("child-two:\n\t\tchildren:\t[child-two-one]"));
+    }
 
-    protected void assertNextOutputRecord(String topic, String key, String 
value) {
+    private void assertNextOutputRecord(String topic, String key, String 
value) {
         ProducerRecord<String, String> record = driver.readOutput(topic, 
STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
         assertEquals(key, record.key());
@@ -240,7 +286,7 @@ public class ProcessorTopologyTest {
         assertNull(record.partition());
     }
 
-    protected void assertNextOutputRecord(String topic, String key, String 
value, Integer partition) {
+    private void assertNextOutputRecord(String topic, String key, String 
value, Integer partition) {
         ProducerRecord<String, String> record = driver.readOutput(topic, 
STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
         assertEquals(key, record.key());
@@ -248,11 +294,11 @@ public class ProcessorTopologyTest {
         assertEquals(partition, record.partition());
     }
 
-    protected void assertNoOutputRecord(String topic) {
+    private void assertNoOutputRecord(String topic) {
         assertNull(driver.readOutput(topic));
     }
 
-    protected StreamPartitioner<Object, Object> constantPartitioner(final 
Integer partition) {
+    private StreamPartitioner<Object, Object> constantPartitioner(final 
Integer partition) {
         return new StreamPartitioner<Object, Object>() {
             @Override
             public Integer partition(Object key, Object value, int 
numPartitions) {
@@ -261,28 +307,28 @@ public class ProcessorTopologyTest {
         };
     }
 
-    protected TopologyBuilder createSimpleTopology(int partition) {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
+    private TopologyBuilder createSimpleTopology(int partition) {
+        return builder.addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new 
ForwardingProcessor()), "source")
                                     .addSink("sink", OUTPUT_TOPIC_1, 
constantPartitioner(partition), "processor");
     }
 
-    protected TopologyBuilder createMultiplexingTopology() {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
+    private TopologyBuilder createMultiplexingTopology() {
+        return builder.addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new 
MultiplexingProcessor(2)), "source")
                                     .addSink("sink1", OUTPUT_TOPIC_1, 
"processor")
                                     .addSink("sink2", OUTPUT_TOPIC_2, 
"processor");
     }
 
-    protected TopologyBuilder createMultiplexByNameTopology() {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
+    private TopologyBuilder createMultiplexByNameTopology() {
+        return builder.addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new 
MultiplexByNameProcessor(2)), "source")
             .addSink("sink0", OUTPUT_TOPIC_1, "processor")
             .addSink("sink1", OUTPUT_TOPIC_2, "processor");
     }
 
-    protected TopologyBuilder createStatefulTopology(String storeName) {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
+    private TopologyBuilder createStatefulTopology(String storeName) {
+        return builder.addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new 
StatefulProcessor(storeName)), "source")
                                     .addStateStore(
                                             
Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
@@ -292,8 +338,8 @@ public class ProcessorTopologyTest {
     }
 
 
-    protected TopologyBuilder createSimpleMultiSourceTopology(int partition) {
-        return new TopologyBuilder().addSource("source-1", 
STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+    private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
+        return builder.addSource("source-1", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
                 .addProcessor("processor-1", define(new 
ForwardingProcessor()), "source-1")
                 .addSink("sink-1", OUTPUT_TOPIC_1, 
constantPartitioner(partition), "processor-1")
                 .addSource("source-2", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_2)
@@ -414,7 +460,7 @@ public class ProcessorTopologyTest {
         }
     }
 
-    protected <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> 
processor) {
+    private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> 
processor) {
         return new ProcessorSupplier<K, V>() {
             @Override
             public Processor<K, V> get() {

Reply via email to