Repository: kafka Updated Branches: refs/heads/trunk da70316a5 -> 35cd008e5
KAFKA-4645: Improve test coverage of ProcessorTopology the toString method prints the topology, but had no tests making sure it works and/or doesn't cause exceptions Author: Damian Guy <damian....@gmail.com> Reviewers: Matthias J. Sax, Eno Thereska, Guozhang Wang Closes #2444 from dguy/KAFKA-4645 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/35cd008e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/35cd008e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/35cd008e Branch: refs/heads/trunk Commit: 35cd008e5a1ec89d09f956a6f9952ad834e556d7 Parents: da70316 Author: Damian Guy <damian....@gmail.com> Authored: Wed Feb 1 20:30:04 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Feb 1 20:30:04 2017 -0800 ---------------------------------------------------------------------- .../internals/ProcessorTopologyTest.java | 88 +++++++++++++++----- 1 file changed, 67 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/35cd008e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 71c234e..f35a2b5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -46,6 +46,8 @@ import org.junit.Test; import java.io.File; import java.util.Properties; +import static org.hamcrest.CoreMatchers.containsString; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; @@ -55,12 +57,14 @@ public class ProcessorTopologyTest { private static final Serializer<String> STRING_SERIALIZER = new StringSerializer(); private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer(); - protected static final String INPUT_TOPIC_1 = "input-topic-1"; - protected static final String INPUT_TOPIC_2 = "input-topic-2"; - protected static final String OUTPUT_TOPIC_1 = "output-topic-1"; - protected static final String OUTPUT_TOPIC_2 = "output-topic-2"; + private static final String INPUT_TOPIC_1 = "input-topic-1"; + private static final String INPUT_TOPIC_2 = "input-topic-2"; + private static final String OUTPUT_TOPIC_1 = "output-topic-1"; + private static final String OUTPUT_TOPIC_2 = "output-topic-2"; private static long timestamp = 1000L; + private final TopologyBuilder builder = new TopologyBuilder(); + private final MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier(); private ProcessorTopologyTestDriver driver; private StreamsConfig config; @@ -89,7 +93,7 @@ public class ProcessorTopologyTest { @Test public void testTopologyMetadata() { - final TopologyBuilder builder = new TopologyBuilder().setApplicationId("X"); + builder.setApplicationId("X"); builder.addSource("source-1", "topic-1"); builder.addSource("source-2", "topic-2", "topic-3"); @@ -206,7 +210,7 @@ public class ProcessorTopologyTest { final String global = "global"; final String topic = "topic"; final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) storeSupplier.get(); - final TopologyBuilder topologyBuilder = new TopologyBuilder() + final TopologyBuilder topologyBuilder = this.builder .addGlobalStore(globalStore, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store"))); driver = new ProcessorTopologyTestDriver(config, topologyBuilder, "my-store"); @@ -230,9 +234,51 @@ public class ProcessorTopologyTest { assertNoOutputRecord(OUTPUT_TOPIC_1); } + @Test + public void shouldCreateStringWithSourceAndTopics() throws Exception { + builder.addSource("source", "topic1", "topic2"); + final ProcessorTopology topology = builder.build(null); + final String result = topology.toString(); + assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n")); + } + + @Test + public void shouldCreateStringWithMultipleSourcesAndTopics() throws Exception { + builder.addSource("source", "topic1", "topic2"); + builder.addSource("source2", "t", "t1", "t2"); + final ProcessorTopology topology = builder.build(null); + final String result = topology.toString(); + assertThat(result, containsString("source:\n\t\ttopics:\t\t[topic1, topic2]\n")); + assertThat(result, containsString("source2:\n\t\ttopics:\t\t[t, t1, t2]\n")); + } + @Test + public void shouldCreateStringWithProcessors() throws Exception { + builder.addSource("source", "t") + .addProcessor("processor", mockProcessorSupplier, "source") + .addProcessor("other", mockProcessorSupplier, "source"); + final ProcessorTopology topology = builder.build(null); + final String result = topology.toString(); + assertThat(result, containsString("\t\tchildren:\t[processor, other]")); + assertThat(result, containsString("processor:\n")); + assertThat(result, containsString("other:\n")); + } + + @Test + public void shouldRecursivelyPrintChildren() throws Exception { + builder.addSource("source", "t") + .addProcessor("processor", mockProcessorSupplier, "source") + .addProcessor("child-one", mockProcessorSupplier, "processor") + .addProcessor("child-one-one", mockProcessorSupplier, "child-one") + .addProcessor("child-two", mockProcessorSupplier, "processor") + .addProcessor("child-two-one", mockProcessorSupplier, "child-two"); + + final String result = builder.build(null).toString(); + assertThat(result, containsString("child-one:\n\t\tchildren:\t[child-one-one]")); + assertThat(result, containsString("child-two:\n\t\tchildren:\t[child-two-one]")); + } - protected void assertNextOutputRecord(String topic, String key, String value) { + private void assertNextOutputRecord(String topic, String key, String value) { ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); assertEquals(topic, record.topic()); assertEquals(key, record.key()); @@ -240,7 +286,7 @@ public class ProcessorTopologyTest { assertNull(record.partition()); } - protected void assertNextOutputRecord(String topic, String key, String value, Integer partition) { + private void assertNextOutputRecord(String topic, String key, String value, Integer partition) { ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); assertEquals(topic, record.topic()); assertEquals(key, record.key()); @@ -248,11 +294,11 @@ public class ProcessorTopologyTest { assertEquals(partition, record.partition()); } - protected void assertNoOutputRecord(String topic) { + private void assertNoOutputRecord(String topic) { assertNull(driver.readOutput(topic)); } - protected StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) { + private StreamPartitioner<Object, Object> constantPartitioner(final Integer partition) { return new StreamPartitioner<Object, Object>() { @Override public Integer partition(Object key, Object value, int numPartitions) { @@ -261,28 +307,28 @@ public class ProcessorTopologyTest { }; } - protected TopologyBuilder createSimpleTopology(int partition) { - return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + private TopologyBuilder createSimpleTopology(int partition) { + return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new ForwardingProcessor()), "source") .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"); } - protected TopologyBuilder createMultiplexingTopology() { - return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + private TopologyBuilder createMultiplexingTopology() { + return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new MultiplexingProcessor(2)), "source") .addSink("sink1", OUTPUT_TOPIC_1, "processor") .addSink("sink2", OUTPUT_TOPIC_2, "processor"); } - protected TopologyBuilder createMultiplexByNameTopology() { - return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + private TopologyBuilder createMultiplexByNameTopology() { + return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source") .addSink("sink0", OUTPUT_TOPIC_1, "processor") .addSink("sink1", OUTPUT_TOPIC_2, "processor"); } - protected TopologyBuilder createStatefulTopology(String storeName) { - return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + private TopologyBuilder createStatefulTopology(String storeName) { + return builder.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new StatefulProcessor(storeName)), "source") .addStateStore( Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(), @@ -292,8 +338,8 @@ public class ProcessorTopologyTest { } - protected TopologyBuilder createSimpleMultiSourceTopology(int partition) { - return new TopologyBuilder().addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + private TopologyBuilder createSimpleMultiSourceTopology(int partition) { + return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") .addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1") .addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) @@ -414,7 +460,7 @@ public class ProcessorTopologyTest { } } - protected <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) { + private <K, V> ProcessorSupplier<K, V> define(final Processor<K, V> processor) { return new ProcessorSupplier<K, V>() { @Override public Processor<K, V> get() {