Repository: kafka Updated Branches: refs/heads/trunk 24067e407 -> 62c0972ef
http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index 80512d9..58090fd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -72,10 +72,10 @@ public class KStreamKStreamLeftJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStream<Integer, String> joined; - MockProcessorSupplier<Integer, String> processor; + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); @@ -84,7 +84,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + final Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -93,8 +93,10 @@ public class KStreamKStreamLeftJoinTest { driver.setTime(0L); // push two items to the primary stream. the other window is empty - // w {} - // --> w = {} + // w1 {} + // w2 {} + // --> w1 = { 0:X0, 1:X1 } + // --> w2 = {} for (int i = 0; i < 2; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); @@ -103,38 +105,47 @@ public class KStreamKStreamLeftJoinTest { processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); // push two items to the other stream. this should produce two items. - // w {} - // --> w = { 0:Y0, 1:Y1 } + // w1 = { 0:X0, 1:X1 } + // w2 {} + // --> w1 = { 0:X0, 1:X1 } + // --> w2 = { 0:Y0, 1:Y1 } for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult(); - // push all four items to the primary stream. this should produce four items. - // w = { 0:Y0, 1:Y1 } - // --> w = { 0:Y0, 1:Y1 } + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - for (int i = 0; i < expectedKeys.length; i++) { + // push three items to the primary stream. this should produce four items. + // w1 = { 0:X0, 1:X1 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 } + // --> w2 = { 0:Y0, 1:Y1 } + + for (int i = 0; i < 3; i++) { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null"); - // push all items to the other stream. this should produce no items. - // w = { 0:Y0, 1:Y1 } - // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + // push all items to the other stream. this should produce 5 items + // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 } + // w2 = { 0:Y0, 1:Y1 } + // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 } + // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3} for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1", "2:X2+YY2"); - // push all four items to the primary stream. this should produce four items. - // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 - // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + // push all four items to the primary stream. this should produce six items. + // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 } + // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3} + // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 } + // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3} for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); @@ -151,10 +162,10 @@ public class KStreamKStreamLeftJoinTest { long time = 0L; - KStream<Integer, String> stream1; - KStream<Integer, String> stream2; - KStream<Integer, String> joined; - MockProcessorSupplier<Integer, String> processor; + final KStream<Integer, String> stream1; + final KStream<Integer, String> stream2; + final KStream<Integer, String> joined; + final MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); @@ -163,7 +174,7 @@ public class KStreamKStreamLeftJoinTest { joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); joined.process(processor); - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + final Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); @@ -171,8 +182,10 @@ public class KStreamKStreamLeftJoinTest { driver = new KStreamTestDriver(builder, stateDir); // push two items to the primary stream. the other window is empty. this should produce two items - // w = {} - // --> w = {} + // w1 = {} + // w2 = {} + // --> w1 = { 0:X0, 1:X1 } + // --> w2 = {} setRecordContext(time, topic1); for (int i = 0; i < 2; i++) { @@ -182,23 +195,27 @@ public class KStreamKStreamLeftJoinTest { processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); // push two items to the other stream. this should produce no items. - // w = {} - // --> w = { 0:Y0, 1:Y1 } + // w1 = { 0:X0, 1:X1 } + // w2 = {} + // --> w1 = { 0:X0, 1:X1 } + // --> w2 = { 0:Y0, 1:Y1 } setRecordContext(time, topic2); for (int i = 0; i < 2; i++) { driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult(); + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); // clear logically time = 1000L; setRecordContext(time, topic2); // push all items to the other stream. this should produce no items. - // w = {} - // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + // w1 = {} + // w2 = {} + // --> w1 = {} + // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } for (int i = 0; i < expectedKeys.length; i++) { setRecordContext(time + i, topic2); driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); @@ -206,8 +223,11 @@ public class KStreamKStreamLeftJoinTest { driver.flushState(); processor.checkAndClearProcessResult(); - // gradually expire items in window. - // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + // gradually expire items in window 2. + // w1 = {} + // w2 = {} + // --> w1 = {} + // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } time = 1000L + 100L; setRecordContext(time, topic1); http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java new file mode 100644 index 0000000..742e852 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java @@ -0,0 +1,146 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class KStreamKTableJoinTest { + + final private String topic1 = "topic1"; + final private String topic2 = "topic2"; + + final private Serde<Integer> intSerde = Serdes.Integer(); + final private Serde<String> stringSerde = Serdes.String(); + + private KStreamTestDriver driver = null; + private File stateDir = null; + + @After + public void tearDown() { + if (driver != null) { + driver.close(); + } + driver = null; + } + + @Before + public void setUp() throws IOException { + stateDir = TestUtils.tempDirectory("kafka-test"); + } + + @Test + public void testJoin() throws Exception { + final KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + final KStream<Integer, String> stream; + final KTable<Integer, String> table; + final MockProcessorSupplier<Integer, String> processor; + + processor = new MockProcessorSupplier<>(); + stream = builder.stream(intSerde, stringSerde, topic1); + table = builder.table(intSerde, stringSerde, topic2, "anyStoreName"); + stream.join(table, MockValueJoiner.STRING_JOINER).process(processor); + + final Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + driver = new KStreamTestDriver(builder, stateDir); + driver.setTime(0L); + + // push two items to the primary stream. the other table is empty + + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + + // push two items to the other stream. this should not produce any item. + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + + // push all four items to the primary stream. this should produce two items. + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); + + // push all items to the other stream. this should not produce any item + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult(); + + // push all four items to the primary stream. this should produce four items. + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); + + // push two items with null to the other stream as deletes. this should not produce any item. + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], null); + } + + processor.checkAndClearProcessResult(); + + // push all four items to the primary stream. this should produce two items. + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3"); + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java index 37aac0c..2175dd5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -228,13 +228,7 @@ public class KStreamWindowAggregateTest { "[A@0]:0+1+1" ); proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult( - "[A@0]:null", - "[B@0]:null", - "[C@0]:null", - "[D@0]:null", - "[A@0]:null" - ); + proc3.checkAndClearProcessResult(); setRecordContext(5, topic1); driver.process(topic1, "A", "1"); @@ -260,13 +254,7 @@ public class KStreamWindowAggregateTest { "[C@0]:0+3+3", "[C@5]:0+3" ); proc2.checkAndClearProcessResult(); - proc3.checkAndClearProcessResult( - "[A@0]:null", "[A@5]:null", - "[B@0]:null", "[B@5]:null", - "[D@0]:null", "[D@5]:null", - "[B@0]:null", "[B@5]:null", - "[C@0]:null", "[C@5]:null" - ); + proc3.checkAndClearProcessResult(); setRecordContext(0, topic1); driver.process(topic2, "A", "a"); http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java index ba10668..1bd8600 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -74,10 +74,10 @@ public class KTableKTableJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable<Integer, String> table1; - KTable<Integer, String> table2; - KTable<Integer, String> joined; - MockProcessorSupplier<Integer, String> processor; + final KTable<Integer, String> table1; + final KTable<Integer, String> table2; + final KTable<Integer, String> joined; + final MockProcessorSupplier<Integer, String> processor; processor = new MockProcessorSupplier<>(); table1 = builder.table(intSerde, stringSerde, topic1, storeName1); @@ -85,17 +85,17 @@ public class KTableKTableJoinTest { joined = table1.join(table2, MockValueJoiner.STRING_JOINER); joined.toStream().process(processor); - Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + final Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier(); + final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier(); driver = new KStreamTestDriver(builder, stateDir); driver.setTime(0L); - KTableValueGetter<Integer, String> getter = getterSupplier.get(); + final KTableValueGetter<Integer, String> getter = getterSupplier.get(); getter.init(driver.context()); // push two items to the primary stream. the other table is empty @@ -105,8 +105,7 @@ public class KTableKTableJoinTest { } driver.flushState(); - processor.checkAndClearProcessResult("0:null", "1:null"); - checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null)); + processor.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. @@ -116,17 +115,17 @@ public class KTableKTableJoinTest { driver.flushState(); processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); + checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1")); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); + processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1"); + checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1")); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { @@ -134,8 +133,8 @@ public class KTableKTableJoinTest { } driver.flushState(); - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); + processor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + checkJoinedValues(getter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); // push all four items to the primary stream. this should produce four items. @@ -155,17 +154,17 @@ public class KTableKTableJoinTest { driver.flushState(); processor.checkAndClearProcessResult("0:null", "1:null"); - checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3")); + checkJoinedValues(getter, kv(0, null), kv(1, null)); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3"); - checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); + processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3"); + checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); } @Test @@ -174,10 +173,10 @@ public class KTableKTableJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable<Integer, String> table1; - KTable<Integer, String> table2; - KTable<Integer, String> joined; - MockProcessorSupplier<Integer, String> proc; + final KTable<Integer, String> table1; + final KTable<Integer, String> table2; + final KTable<Integer, String> joined; + final MockProcessorSupplier<Integer, String> proc; table1 = builder.table(intSerde, stringSerde, topic1, storeName1); table2 = builder.table(intSerde, stringSerde, topic2, storeName2); @@ -200,7 +199,7 @@ public class KTableKTableJoinTest { } driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); + proc.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. @@ -211,21 +210,21 @@ public class KTableKTableJoinTest { proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)", "3:(null<-null)"); + proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", "1:(XX1+Y1<-null)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", "1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -243,13 +242,13 @@ public class KTableKTableJoinTest { driver.flushState(); proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); + proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } @Test @@ -258,10 +257,10 @@ public class KTableKTableJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable<Integer, String> table1; - KTable<Integer, String> table2; - KTable<Integer, String> joined; - MockProcessorSupplier<Integer, String> proc; + final KTable<Integer, String> table1; + final KTable<Integer, String> table2; + final KTable<Integer, String> joined; + final MockProcessorSupplier<Integer, String> proc; table1 = builder.table(intSerde, stringSerde, topic1, storeName1); table2 = builder.table(intSerde, stringSerde, topic2, storeName2); @@ -285,7 +284,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); + proc.checkAndClearProcessResult(); // push two items to the other stream. this should produce two items. @@ -295,20 +294,20 @@ public class KTableKTableJoinTest { driver.flushState(); proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)", "3:(null<-null)"); + proc.checkAndClearProcessResult("0:(XX0+Y0<-X0+Y0)", "1:(XX1+Y1<-X1+Y1)"); // push all items to the other stream. this should produce four items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + proc.checkAndClearProcessResult("0:(XX0+YY0<-XX0+Y0)", "1:(XX1+YY1<-XX1+Y1)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); // push all four items to the primary stream. this should produce four items. @@ -316,7 +315,7 @@ public class KTableKTableJoinTest { driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("0:(X0+YY0<-XX0+YY0)", "1:(X1+YY1<-XX1+YY1)", "2:(X2+YY2<-XX2+YY2)", "3:(X3+YY3<-XX3+YY3)"); // push two items with null to the other stream as deletes. this should produce two item. @@ -326,13 +325,13 @@ public class KTableKTableJoinTest { driver.flushState(); proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. for (int i = 0; i < expectedKeys.length; i++) { driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); } driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); + proc.checkAndClearProcessResult("2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } private KeyValue<Integer, String> kv(Integer key, String value) { http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index 5f84678..816979a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -164,10 +164,10 @@ public class KTableKTableLeftJoinTest { final int[] expectedKeys = new int[]{0, 1, 2, 3}; - KTable<Integer, String> table1; - KTable<Integer, String> table2; - KTable<Integer, String> joined; - MockProcessorSupplier<Integer, String> proc; + final KTable<Integer, String> table1; + final KTable<Integer, String> table2; + final KTable<Integer, String> joined; + final MockProcessorSupplier<Integer, String> proc; table1 = builder.table(intSerde, stringSerde, topic1, storeName1); table2 = builder.table(intSerde, stringSerde, topic2, storeName2); @@ -179,7 +179,7 @@ public class KTableKTableLeftJoinTest { driver = new KStreamTestDriver(builder, stateDir); driver.setTime(0L); - assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); + assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index a6249bc..8d1c70a 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; @@ -189,8 +189,8 @@ public class KTableKTableOuterJoinTest { driver = new KStreamTestDriver(builder, stateDir); - assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); - assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); + assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); + assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); // push two items to the primary stream. the other table is empty