Repository: kafka Updated Branches: refs/heads/trunk 9ffa907d7 -> 22de0a8ab
MINOR: join test for windowed keys guozhangwang Author: Yasuhiro Matsuda <[email protected]> Reviewers: Guozhang Wang Closes #814 from ymatsuda/windowed_key_join_test Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/22de0a8a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/22de0a8a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/22de0a8a Branch: refs/heads/trunk Commit: 22de0a8ab5d0e84fa40016754f9a8eff8193aa89 Parents: 9ffa907 Author: Yasuhiro Matsuda <[email protected]> Authored: Tue Jan 26 23:03:42 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Tue Jan 26 23:03:42 2016 -0800 ---------------------------------------------------------------------- .../kstream/internals/KStreamAggregate.java | 4 +- .../kstream/internals/KStreamAggregateTest.java | 155 +++++++++++++++++++ 2 files changed, 157 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/22de0a8a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java index 26002f5..49f3e71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamAggregate.java @@ -161,10 +161,10 @@ public class KStreamAggregate<K, V, T, W extends Window> implements KTableProces K key = windowedKey.value(); W window = (W) windowedKey.window(); - // this iterator should only contain one element + // this iterator should contain at most one element Iterator<KeyValue<Long, T>> iter = windowStore.fetch(key, window.start(), window.start()); - return iter.next().value; + return iter.hasNext() ? iter.next().value : null; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/22de0a8a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java index ecc303d..8a81113 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamAggregateTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.HoppingWindows; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; @@ -137,4 +138,158 @@ public class KStreamAggregateTest { Utils.delete(baseDir); } } + + @Test + public void testJoin() throws Exception { + final File baseDir = Files.createTempDirectory("test").toFile(); + + try { + final KStreamBuilder builder = new KStreamBuilder(); + String topic1 = "topic1"; + String topic2 = "topic2"; + + KStream<String, String> stream1 = builder.stream(strDeserializer, strDeserializer, topic1); + KTable<Windowed<String>, String> table1 = stream1.aggregateByKey(new StringCanonizer(), + HoppingWindows.of("topic1-Canonized").with(10L).every(5L), + strSerializer, + strSerializer, + strDeserializer, + strDeserializer); + + MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>(); + table1.toStream().process(proc1); + + KStream<String, String> stream2 = builder.stream(strDeserializer, strDeserializer, topic2); + KTable<Windowed<String>, String> table2 = stream2.aggregateByKey(new StringCanonizer(), + HoppingWindows.of("topic2-Canonized").with(10L).every(5L), + strSerializer, + strSerializer, + strDeserializer, + strDeserializer); + + MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>(); + table2.toStream().process(proc2); + + + MockProcessorSupplier<Windowed<String>, String> proc3 = new MockProcessorSupplier<>(); + table1.join(table2, new ValueJoiner<String, String, String>() { + @Override + public String apply(String p1, String p2) { + return p1 + "%" + p2; + } + }).toStream().process(proc3); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + + driver.setTime(0L); + driver.process(topic1, "A", "1"); + driver.setTime(1L); + driver.process(topic1, "B", "2"); + driver.setTime(2L); + driver.process(topic1, "C", "3"); + driver.setTime(3L); + driver.process(topic1, "D", "4"); + driver.setTime(4L); + driver.process(topic1, "A", "1"); + + proc1.checkAndClearResult( + "[A@0]:0+1", + "[B@0]:0+2", + "[C@0]:0+3", + "[D@0]:0+4", + "[A@0]:0+1+1" + ); + proc2.checkAndClearResult(); + proc3.checkAndClearResult( + "[A@0]:null", + "[B@0]:null", + "[C@0]:null", + "[D@0]:null", + "[A@0]:null" + ); + + driver.setTime(5L); + driver.process(topic1, "A", "1"); + driver.setTime(6L); + driver.process(topic1, "B", "2"); + driver.setTime(7L); + driver.process(topic1, "D", "4"); + driver.setTime(8L); + driver.process(topic1, "B", "2"); + driver.setTime(9L); + driver.process(topic1, "C", "3"); + + proc1.checkAndClearResult( + "[A@0]:0+1+1+1", "[A@5]:0+1", + "[B@0]:0+2+2", "[B@5]:0+2", + "[D@0]:0+4+4", "[D@5]:0+4", + "[B@0]:0+2+2+2", "[B@5]:0+2+2", + "[C@0]:0+3+3", "[C@5]:0+3" + ); + proc2.checkAndClearResult(); + proc3.checkAndClearResult( + "[A@0]:null", "[A@5]:null", + "[B@0]:null", "[B@5]:null", + "[D@0]:null", "[D@5]:null", + "[B@0]:null", "[B@5]:null", + "[C@0]:null", "[C@5]:null" + ); + + driver.setTime(0L); + driver.process(topic2, "A", "a"); + driver.setTime(1L); + driver.process(topic2, "B", "b"); + driver.setTime(2L); + driver.process(topic2, "C", "c"); + driver.setTime(3L); + driver.process(topic2, "D", "d"); + driver.setTime(4L); + driver.process(topic2, "A", "a"); + + proc1.checkAndClearResult(); + proc2.checkAndClearResult( + "[A@0]:0+a", + "[B@0]:0+b", + "[C@0]:0+c", + "[D@0]:0+d", + "[A@0]:0+a+a" + ); + proc3.checkAndClearResult( + "[A@0]:0+1+1+1%0+a", + "[B@0]:0+2+2+2%0+b", + "[C@0]:0+3+3%0+c", + "[D@0]:0+4+4%0+d", + "[A@0]:0+1+1+1%0+a+a"); + + driver.setTime(5L); + driver.process(topic2, "A", "a"); + driver.setTime(6L); + driver.process(topic2, "B", "b"); + driver.setTime(7L); + driver.process(topic2, "D", "d"); + driver.setTime(8L); + driver.process(topic2, "B", "b"); + driver.setTime(9L); + driver.process(topic2, "C", "c"); + + proc1.checkAndClearResult(); + proc2.checkAndClearResult( + "[A@0]:0+a+a+a", "[A@5]:0+a", + "[B@0]:0+b+b", "[B@5]:0+b", + "[D@0]:0+d+d", "[D@5]:0+d", + "[B@0]:0+b+b+b", "[B@5]:0+b+b", + "[C@0]:0+c+c", "[C@5]:0+c" + ); + proc3.checkAndClearResult( + "[A@0]:0+1+1+1%0+a+a+a", "[A@5]:0+1%0+a", + "[B@0]:0+2+2+2%0+b+b", "[B@5]:0+2+2%0+b", + "[D@0]:0+4+4%0+d+d", "[D@5]:0+4%0+d", + "[B@0]:0+2+2+2%0+b+b+b", "[B@5]:0+2+2%0+b+b", + "[C@0]:0+3+3%0+c+c", "[C@5]:0+3%0+c" + ); + + } finally { + Utils.delete(baseDir); + } + } }
