Repository: kafka
Updated Branches:
  refs/heads/trunk 7de22453b -> 1f8a2ad2e


KAFKA-4461: Added support to ProcessorTopologyTestDriver for internal topics

This resolves an issue in driving tests using the ProcessorTopologyTestDriver 
when `groupBy()` is invoked downstream of a processor that flags repartitioning.

Ticket: https://issues.apache.org/jira/browse/KAFKA-4461
Discussion: http://search-hadoop.com/m/Kafka/uyzND1wbKeY1Q8nH1

dguy guozhangwang

The contribution is my original work and I license the work to the project 
under the project's open source license.

Author: Adrian McCague <amcca...@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #2499 from amccague/KAFKA-4461_ProcessorTopologyTestDriver_map_groupbykey


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f8a2ad2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f8a2ad2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f8a2ad2

Branch: refs/heads/trunk
Commit: 1f8a2ad2edc5d9de4dc3a3311a0650f25d2b9114
Parents: 7de2245
Author: Adrian McCague <amcca...@gmail.com>
Authored: Mon Feb 6 11:37:48 2017 -0800
Committer: Guozhang Wang <wangg...@gmail.com>
Committed: Mon Feb 6 11:37:48 2017 -0800

----------------------------------------------------------------------
 .../internals/ProcessorTopologyTest.java         | 19 +++++++++++++++++++
 .../kafka/test/ProcessorTopologyTestDriver.java  | 15 +++++++++++++--
 2 files changed, 32 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1f8a2ad2/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index f35a2b5..a0b2b8e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -61,6 +61,7 @@ public class ProcessorTopologyTest {
     private static final String INPUT_TOPIC_2 = "input-topic-2";
     private static final String OUTPUT_TOPIC_1 = "output-topic-1";
     private static final String OUTPUT_TOPIC_2 = "output-topic-2";
+    private static final String THROUGH_TOPIC_1 = "through-topic-1";
 
     private static long timestamp = 1000L;
     private final TopologyBuilder builder = new TopologyBuilder();
@@ -235,6 +236,17 @@ public class ProcessorTopologyTest {
     }
 
     @Test
+    public void testDrivingInternalRepartitioningTopology() {
+        driver = new ProcessorTopologyTestDriver(config, 
createInternalRepartitioningTopology());
+        driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, 
STRING_SERIALIZER);
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2");
+        assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3");
+    }
+
+    @Test
     public void shouldCreateStringWithSourceAndTopics() throws Exception {
         builder.addSource("source", "topic1", "topic2");
         final ProcessorTopology topology = builder.build(null);
@@ -337,6 +349,13 @@ public class ProcessorTopologyTest {
                                     .addSink("counts", OUTPUT_TOPIC_1, 
"processor");
     }
 
+    private TopologyBuilder createInternalRepartitioningTopology() {
+        return builder.addSource("source", INPUT_TOPIC_1)
+            .addInternalTopic(THROUGH_TOPIC_1)
+            .addSink("sink0", THROUGH_TOPIC_1, "source")
+            .addSource("source1", THROUGH_TOPIC_1)
+            .addSink("sink1", OUTPUT_TOPIC_1, "source1");
+    }
 
     private TopologyBuilder createSimpleMultiSourceTopology(int partition) {
         return builder.addSource("source-1", STRING_DESERIALIZER, 
STRING_DESERIALIZER, INPUT_TOPIC_1)

http://git-wip-us.apache.org/repos/asf/kafka/blob/1f8a2ad2/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 277f5f5..b50ff34 100644
--- 
a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ 
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -20,10 +20,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.MockConsumer;
@@ -150,6 +152,7 @@ public class ProcessorTopologyTestDriver {
     private final Map<String, TopicPartition> partitionsByTopic = new 
HashMap<>();
     private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = 
new HashMap<>();
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> 
outputRecordsByTopic = new HashMap<>();
+    private final Set<String> internalTopics = new HashSet<>();
     private final ProcessorTopology globalTopology;
     private final Map<String, TopicPartition> globalPartitionsByTopic = new 
HashMap<>();
     private StreamTask task;
@@ -176,6 +179,11 @@ public class ProcessorTopologyTestDriver {
         };
         restoreStateConsumer = createRestoreConsumer(id, storeNames);
 
+        // Identify internal topics for forwarding in process ...
+        for (TopologyBuilder.TopicsInfo topicsInfo : 
builder.topicGroups().values()) {
+            internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet());
+        }
+
         // Set up all of the topic+partition information and subscribe the 
consumer to each ...
         for (String topic : topology.sourceTopics()) {
             TopicPartition tp = new TopicPartition(topic, 1);
@@ -183,8 +191,6 @@ public class ProcessorTopologyTestDriver {
             offsetsByTopicPartition.put(tp, new AtomicLong());
         }
 
-
-
         consumer.assign(offsetsByTopicPartition.keySet());
 
         final StateDirectory stateDirectory = new 
StateDirectory(applicationId, TestUtils.tempDirectory().getPath(), Time.SYSTEM);
@@ -250,6 +256,11 @@ public class ProcessorTopologyTestDriver {
                     outputRecordsByTopic.put(record.topic(), outputRecords);
                 }
                 outputRecords.add(record);
+
+                // Forward back into the topology if the produced record is to 
an internal topic ...
+                if (internalTopics.contains(record.topic())) {
+                    process(record.topic(), record.key(), record.value());
+                }
             }
         } else {
             final TopicPartition global = 
globalPartitionsByTopic.get(topicName);

Reply via email to