Repository: kafka Updated Branches: refs/heads/trunk d4c379832 -> 190239441
MINOR: Fix ProcessorTopologyTestDriver to support multiple source topics There's a minor bug in ProcessorTopologyTestDriver that prevents it from working with a topology that contains multiple sources. The bug is that ```consumer.assign()``` is called while looping through all the source topics, but, consumer.assign resets the state of the MockConsumer to only consume from the topics passed in. This patch fixes the issue by calling consumer.assign once with all the TopicPartition instances. Unit test (testDrivingSimpleMultiSourceTopology) included. This contribution is my original work and I license the work to the project under the project's open source license. Author: Mathieu Fenniak <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1782 from mfenniak/ProcessorTopologyTestDriver-multiple-source-bugfix Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/19023944 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/19023944 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/19023944 Branch: refs/heads/trunk Commit: 190239441878d73ad0c50d59c73bcc8717a12ed6 Parents: d4c3798 Author: Mathieu Fenniak <[email protected]> Authored: Fri Aug 26 12:40:18 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Aug 26 12:40:18 2016 -0700 ---------------------------------------------------------------------- .../internals/ProcessorTopologyTest.java | 72 +++++++++++++------- .../kafka/test/ProcessorTopologyTestDriver.java | 2 +- 2 files changed, 49 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/19023944/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index f7ef7f7..09434c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -54,7 +54,8 @@ public class ProcessorTopologyTest { private static final Serializer<String> STRING_SERIALIZER = new StringSerializer(); private static final Deserializer<String> STRING_DESERIALIZER = new StringDeserializer(); - protected static final String INPUT_TOPIC = "input-topic"; + protected static final String INPUT_TOPIC_1 = "input-topic-1"; + protected static final String INPUT_TOPIC_2 = "input-topic-2"; protected static final String OUTPUT_TOPIC_1 = "output-topic-1"; protected static final String OUTPUT_TOPIC_2 = "output-topic-2"; @@ -117,17 +118,17 @@ public class ProcessorTopologyTest { public void testDrivingSimpleTopology() { int partition = 10; driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition)); - driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition); assertNoOutputRecord(OUTPUT_TOPIC_2); - driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition); assertNoOutputRecord(OUTPUT_TOPIC_2); - driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); assertNoOutputRecord(OUTPUT_TOPIC_2); assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition); assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", partition); @@ -137,17 +138,17 @@ public class ProcessorTopologyTest { @Test public void testDrivingMultiplexingTopology() { driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology()); - driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)"); - driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)"); assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)"); - driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)"); assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)"); assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)"); @@ -159,17 +160,17 @@ public class ProcessorTopologyTest { @Test public void testDrivingMultiplexByNameTopology() { driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology()); - driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)"); - driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)"); assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)"); - driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)"); assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)"); assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)"); @@ -182,10 +183,10 @@ public class ProcessorTopologyTest { public void testDrivingStatefulTopology() { String storeName = "entries"; driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName); - driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); - driver.process(INPUT_TOPIC, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, STRING_SERIALIZER); assertNoOutputRecord(OUTPUT_TOPIC_1); KeyValueStore<String, String> store = driver.getKeyValueStore("entries"); @@ -195,6 +196,20 @@ public class ProcessorTopologyTest { assertNull(store.get("key4")); } + @Test + public void testDrivingSimpleMultiSourceTopology() { + int partition = 10; + driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition)); + + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition); + assertNoOutputRecord(OUTPUT_TOPIC_2); + + driver.process(INPUT_TOPIC_2, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition); + assertNoOutputRecord(OUTPUT_TOPIC_1); + } + protected void assertNextOutputRecord(String topic, String key, String value) { ProducerRecord<String, String> record = driver.readOutput(topic, STRING_DESERIALIZER, STRING_DESERIALIZER); assertEquals(topic, record.topic()); @@ -225,27 +240,27 @@ public class ProcessorTopologyTest { } protected TopologyBuilder createSimpleTopology(int partition) { - return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) + return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new ForwardingProcessor()), "source") .addSink("sink", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor"); } protected TopologyBuilder createMultiplexingTopology() { - return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) + return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new MultiplexingProcessor(2)), "source") .addSink("sink1", OUTPUT_TOPIC_1, "processor") .addSink("sink2", OUTPUT_TOPIC_2, "processor"); } protected TopologyBuilder createMultiplexByNameTopology() { - return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) + return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source") .addSink("sink0", OUTPUT_TOPIC_1, "processor") .addSink("sink1", OUTPUT_TOPIC_2, "processor"); } protected TopologyBuilder createStatefulTopology(String storeName) { - return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) + return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) .addProcessor("processor", define(new StatefulProcessor(storeName)), "source") .addStateStore( Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(), @@ -254,6 +269,15 @@ public class ProcessorTopologyTest { .addSink("counts", OUTPUT_TOPIC_1, "processor"); } + protected TopologyBuilder createSimpleMultiSourceTopology(int partition) { + return new TopologyBuilder().addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) + .addProcessor("processor-1", define(new ForwardingProcessor()), "source-1") + .addSink("sink-1", OUTPUT_TOPIC_1, constantPartitioner(partition), "processor-1") + .addSource("source-2", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_2) + .addProcessor("processor-2", define(new ForwardingProcessor()), "source-2") + .addSink("sink-2", OUTPUT_TOPIC_2, constantPartitioner(partition), "processor-2"); + } + /** * A processor that simply forwards all messages to all children. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/19023944/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 6b8d969..8d2ad08 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -163,10 +163,10 @@ public class ProcessorTopologyTestDriver { // Set up all of the topic+partition information and subscribe the consumer to each ... for (String topic : topology.sourceTopics()) { TopicPartition tp = new TopicPartition(topic, 1); - consumer.assign(Collections.singletonList(tp)); partitionsByTopic.put(topic, tp); offsetsByTopicPartition.put(tp, new AtomicLong()); } + consumer.assign(offsetsByTopicPartition.keySet()); task = new StreamTask(id, applicationId,
