Repository: kafka Updated Branches: refs/heads/trunk 99d232922 -> 8dbd688b1
KAFKA-3497: Streams ProcessorContext should support forward() based on child name Author: Eno Thereska <[email protected]> Reviewers: Yuto Kawamura, Michael G. Noll, Guozhang Wang Closes #1194 from enothereska/kafka-3497-forward Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8dbd688b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8dbd688b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8dbd688b Branch: refs/heads/trunk Commit: 8dbd688b1617968329087317fa6bde8b8df0392e Parents: 99d2329 Author: Eno Thereska <[email protected]> Authored: Thu Apr 7 10:20:17 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Apr 7 10:20:17 2016 -0700 ---------------------------------------------------------------------- .../streams/processor/ProcessorContext.java | 9 ++++ .../internals/ProcessorContextImpl.java | 5 ++ .../processor/internals/StandbyContextImpl.java | 5 ++ .../streams/processor/internals/StreamTask.java | 16 ++++++ .../internals/ProcessorTopologyTest.java | 56 ++++++++++++++++++++ .../apache/kafka/test/KStreamTestDriver.java | 16 ++++++ .../apache/kafka/test/MockProcessorContext.java | 6 +++ 7 files changed, 113 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 434996e..8bac3e3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -106,10 +106,19 @@ public interface ProcessorContext { * Forwards a key/value pair to one of the downstream processors designated by childIndex * @param key key * @param value value + * @param childIndex index in list of children of this node */ <K, V> void forward(K key, V value, int childIndex); /** + * Forwards a key/value pair to one of the downstream processors designated by the downstream processor name + * @param key key + * @param value value + * @param childName name of downstream processor + */ + <K, V> void forward(K key, V value, String childName); + + /** * Requests a commit */ void commit(); http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 888b89e..5bda856 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -168,6 +168,11 @@ public class ProcessorContextImpl implements ProcessorContext, RecordCollector.S } @Override + public <K, V> void forward(K key, V value, String childName) { + task.forward(key, value, childName); + } + + @Override public void commit() { task.needCommit(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java index 3ad06e2..d5a9683 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyContextImpl.java @@ -142,6 +142,11 @@ public class StandbyContextImpl implements ProcessorContext, RecordCollector.Sup } @Override + public <K, V> void forward(K key, V value, String childName) { + throw new UnsupportedOperationException(); + } + + @Override public void commit() { throw new UnsupportedOperationException(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index 61aeced..a484980 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -342,4 +342,20 @@ public class StreamTask extends AbstractTask implements Punctuator { } } + @SuppressWarnings("unchecked") + public <K, V> void forward(K key, V value, String childName) { + ProcessorNode thisNode = currNode; + for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { + if (childNode.name().equals(childName)) { + currNode = childNode; + try { + childNode.process(key, value); + } finally { + currNode = thisNode; + } + break; + } + } + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index ef08176..1095fcf 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -157,6 +157,28 @@ public class ProcessorTopologyTest { } @Test + public void testDrivingMultiplexByNameTopology() { + driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology()); + driver.process(INPUT_TOPIC, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)"); + + driver.process(INPUT_TOPIC, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key2", "value2(2)"); + + driver.process(INPUT_TOPIC, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key4", "value4", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC, "key5", "value5", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key4", "value4(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key5", "value5(1)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key3", "value3(2)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key4", "value4(2)"); + assertNextOutputRecord(OUTPUT_TOPIC_2, "key5", "value5(2)"); + } + + @Test public void testDrivingStatefulTopology() { String storeName = "entries"; driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName), storeName); @@ -215,6 +237,13 @@ public class ProcessorTopologyTest { .addSink("sink2", OUTPUT_TOPIC_2, "processor"); } + protected TopologyBuilder createMultiplexByNameTopology() { + return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) + .addProcessor("processor", define(new MultiplexByNameProcessor(2)), "source") + .addSink("sink0", OUTPUT_TOPIC_1, "processor") + .addSink("sink1", OUTPUT_TOPIC_2, "processor"); + } + protected TopologyBuilder createStatefulTopology(String storeName) { return new TopologyBuilder().addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC) .addProcessor("processor", define(new StatefulProcessor(storeName)), "source") @@ -268,6 +297,33 @@ public class ProcessorTopologyTest { } /** + * A processor that forwards slightly-modified messages to each named child. + * Note: the children are assumed to be named "sink{child number}", e.g., sink1, or sink2, etc. + */ + protected static class MultiplexByNameProcessor extends AbstractProcessor<String, String> { + + private final int numChildren; + + public MultiplexByNameProcessor(int numChildren) { + this.numChildren = numChildren; + } + + @Override + public void process(String key, String value) { + for (int i = 0; i != numChildren; ++i) { + context().forward(key, value + "(" + (i + 1) + ")", "sink" + i); + } + } + + @Override + public void punctuate(long streamTime) { + for (int i = 0; i != numChildren; ++i) { + context().forward(Long.toString(streamTime), "punctuate(" + (i + 1) + ")", "sink" + i); + } + } + } + + /** * A processor that stores each key-value pair in an in-memory key-value store registered with the context. When * {@link #punctuate(long)} is called, it outputs the total number of entries in the store. */ http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 0c56c26..5cfee6b 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -120,6 +120,22 @@ public class KStreamTestDriver { } } + @SuppressWarnings("unchecked") + public <K, V> void forward(K key, V value, String childName) { + ProcessorNode thisNode = currNode; + for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) { + if (childNode.name().equals(childName)) { + currNode = childNode; + try { + childNode.process(key, value); + } finally { + currNode = thisNode; + } + break; + } + } + } + public Map<String, StateStore> allStateStores() { return context.allStateStores(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8dbd688b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index e57e1c7..d3b8081 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -159,6 +159,12 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S } @Override + @SuppressWarnings("unchecked") + public <K, V> void forward(K key, V value, String childName) { + driver.forward(key, value, childName); + } + + @Override public void commit() { throw new UnsupportedOperationException("commit() not supported."); }
