Repository: kafka
Updated Branches:
  refs/heads/trunk d4c379832 -> 190239441


MINOR: Fix ProcessorTopologyTestDriver to support multiple source topics

There's a minor bug in ProcessorTopologyTestDriver that prevents it from 
working with a topology that contains multiple sources.  The bug is that 
```consumer.assign()``` is called while looping through all the source topics, 
but, consumer.assign resets the state of the MockConsumer to only consume from 
the topics passed in.  This patch fixes the issue by calling consumer.assign 
once with all the TopicPartition instances.  Unit test 
(testDrivingSimpleMultiSourceTopology) included.

This contribution is my original work and I license the work to the project 
under the project's open source license.

Author: Mathieu Fenniak <[email protected]>

Reviewers: Guozhang Wang <[email protected]>

Closes #1782 from mfenniak/ProcessorTopologyTestDriver-multiple-source-bugfix


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/19023944
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/19023944
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/19023944

Branch: refs/heads/trunk
Commit: 190239441878d73ad0c50d59c73bcc8717a12ed6
Parents: d4c3798
Author: Mathieu Fenniak <[email protected]>
Authored: Fri Aug 26 12:40:18 2016 -0700
Committer: Guozhang Wang <[email protected]>
Committed: Fri Aug 26 12:40:18 2016 -0700

----------------------------------------------------------------------
 .../internals/ProcessorTopologyTest.java        | 72 +++++++++++++-------
 .../kafka/test/ProcessorTopologyTestDriver.java |  2 +-
 2 files changed, 49 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/19023944/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index f7ef7f7..09434c3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -54,7 +54,8 @@ public class ProcessorTopologyTest {
     private static final Serializer<String> STRING_SERIALIZER = new 
StringSerializer();
     private static final Deserializer<String> STRING_DESERIALIZER = new 
StringDeserializer();
 
-    protected static final String INPUT_TOPIC = "input-topic";
+    protected static final String INPUT_TOPIC_1 = "input-topic-1";
+    protected static final String INPUT_TOPIC_2 = "input-topic-2";
     protected static final String OUTPUT_TOPIC_1 = "output-topic-1";
     protected static final String OUTPUT_TOPIC_2 = "output-topic-2";
 
@@ -117,17 +118,17 @@ public class ProcessorTopologyTest {
     public void testDrivingSimpleTopology() {
         int partition = 10;
         driver = new ProcessorTopologyTestDriver(config, 
createSimpleTopology(partition));
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2", partition);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
 
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNoOutputRecord(OUTPUT_TOPIC_2);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3", partition);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4", partition);
@@ -137,17 +138,17 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingMultiplexingTopology() {
         driver = new ProcessorTopologyTestDriver(config, 
createMultiplexingTopology());
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
 
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
 
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
@@ -159,17 +160,17 @@ public class ProcessorTopologyTest {
     @Test
     public void testDrivingMultiplexByNameTopology() {
         driver = new ProcessorTopologyTestDriver(config, 
createMultiplexByNameTopology());
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)");
 
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)");
 
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key4", "value4", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key5", "value5", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)");
         assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)");
@@ -182,10 +183,10 @@ public class ProcessorTopologyTest {
     public void testDrivingStatefulTopology() {
         String storeName = "entries";
         driver = new ProcessorTopologyTestDriver(config, 
createStatefulTopology(storeName), storeName);
-        driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
-        driver.process(INPUT_TOPIC, "key1", "value4", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key1", "value4", STRING_SERIALIZER, 
STRING_SERIALIZER);
         assertNoOutputRecord(OUTPUT_TOPIC_1);
 
         KeyValueStore<String, String> store = 
driver.getKeyValueStore("entries");
@@ -195,6 +196,20 @@ public class ProcessorTopologyTest {
         assertNull(store.get("key4"));
     }
 
+    @Test
+    public void testDrivingSimpleMultiSourceTopology() {
+        int partition = 10;
+        driver = new ProcessorTopologyTestDriver(config, 
createSimpleMultiSourceTopology(partition));
+
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition);
+        assertNoOutputRecord(OUTPUT_TOPIC_2);
+
+        driver.process(INPUT_TOPIC_2, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2", partition);
+        assertNoOutputRecord(OUTPUT_TOPIC_1);
+    }
+
     protected void assertNextOutputRecord(String topic, String key, String 
value) {
         ProducerRecord<String, String> record = driver.readOutput(topic, 
STRING_DESERIALIZER, STRING_DESERIALIZER);
         assertEquals(topic, record.topic());
@@ -225,27 +240,27 @@ public class ProcessorTopologyTest {
     }
 
     protected TopologyBuilder createSimpleTopology(int partition) {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC)
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new 
ForwardingProcessor()), "source")
                                     .addSink("sink", OUTPUT_TOPIC_1, 
constantPartitioner(partition), "processor");
     }
 
     protected TopologyBuilder createMultiplexingTopology() {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC)
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new 
MultiplexingProcessor(2)), "source")
                                     .addSink("sink1", OUTPUT_TOPIC_1, 
"processor")
                                     .addSink("sink2", OUTPUT_TOPIC_2, 
"processor");
     }
 
     protected TopologyBuilder createMultiplexByNameTopology() {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC)
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
             .addProcessor("processor", define(new 
MultiplexByNameProcessor(2)), "source")
             .addSink("sink0", OUTPUT_TOPIC_1, "processor")
             .addSink("sink1", OUTPUT_TOPIC_2, "processor");
     }
 
     protected TopologyBuilder createStatefulTopology(String storeName) {
-        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC)
+        return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)
                                     .addProcessor("processor", define(new 
StatefulProcessor(storeName)), "source")
                                     .addStateStore(
                                             
Stores.create(storeName).withStringKeys().withStringValues().inMemory().build(),
@@ -254,6 +269,15 @@ public class ProcessorTopologyTest {
                                     .addSink("counts", OUTPUT_TOPIC_1, 
"processor");
     }
 
+    protected TopologyBuilder createSimpleMultiSourceTopology(int partition) {
+        return new TopologyBuilder().addSource("source-1", 
STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1)
+                .addProcessor("processor-1", define(new 
ForwardingProcessor()), "source-1")
+                .addSink("sink-1", OUTPUT_TOPIC_1, 
constantPartitioner(partition), "processor-1")
+                .addSource("source-2", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_2)
+                .addProcessor("processor-2", define(new 
ForwardingProcessor()), "source-2")
+                .addSink("sink-2", OUTPUT_TOPIC_2, 
constantPartitioner(partition), "processor-2");
+    }
+
     /**
      * A processor that simply forwards all messages to all children.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/19023944/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 6b8d969..8d2ad08 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -163,10 +163,10 @@ public class ProcessorTopologyTestDriver {
         // Set up all of the topic+partition information and subscribe the 
consumer to each ...
         for (String topic : topology.sourceTopics()) {
             TopicPartition tp = new TopicPartition(topic, 1);
-            consumer.assign(Collections.singletonList(tp));
             partitionsByTopic.put(topic, tp);
             offsetsByTopicPartition.put(tp, new AtomicLong());
         }
+        consumer.assign(offsetsByTopicPartition.keySet());
 
         task = new StreamTask(id,
             applicationId,

Reply via email to