This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new c1918a2 KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] (#5433) c1918a2 is described below commit c1918a2b9a9ba282efe92ddb867abef3d6d9b98c Author: Filipe Agapito <filipe.agap...@gmail.com> AuthorDate: Thu Feb 14 23:06:47 2019 +0000 KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] (#5433) Reviewer: John Roesler <j...@confluent.io>, Matthias J. Sax <matth...@confluent.io> --- .../internals/KTableKTableInnerJoinTest.java | 319 +++++++--------- .../internals/KTableKTableLeftJoinTest.java | 417 +++++++++----------- .../internals/KTableKTableOuterJoinTest.java | 425 +++++++++------------ 3 files changed, 499 insertions(+), 662 deletions(-) diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java index 63ed53f..5d1f5c3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java @@ -16,31 +16,30 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; -import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.streams.test.OutputVerifier; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Rule; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.io.File; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.Properties; import java.util.Set; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; @@ -54,22 +53,12 @@ public class KTableKTableInnerJoinTest { private final String topic1 = "topic1"; private final String topic2 = "topic2"; + final private String output = "output"; - private final Serde<Integer> intSerde = Serdes.Integer(); - private final Serde<String> stringSerde = Serdes.String(); - private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde); - private final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.with(intSerde, stringSerde); - - private File stateDir = null; - - @SuppressWarnings("deprecation") - @Rule - public final org.apache.kafka.test.KStreamTestDriver driver = new org.apache.kafka.test.KStreamTestDriver(); - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } + private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + private final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.with(Serdes.Integer(), Serdes.String()); + private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(Serdes.Integer().serializer(), Serdes.String().serializer()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testJoin() { @@ -80,13 +69,12 @@ public class KTableKTableInnerJoinTest { final KTable<Integer, String> table1; final KTable<Integer, String> table2; final KTable<Integer, String> joined; - final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>(); table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER); - joined.toStream().process(supplier); + joined.toStream().to(output); - doTestJoin(builder, expectedKeys, supplier, joined); + doTestJoin(builder, expectedKeys); } @Test @@ -98,13 +86,12 @@ public class KTableKTableInnerJoinTest { final KTable<Integer, String> table1; final KTable<Integer, String> table2; final KTable<Integer, String> table3; - final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>(); table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); table3 = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, materialized); - table3.toStream().process(processor); + table3.toStream().to(output); - doTestJoin(builder, expectedKeys, processor, table3); + doTestJoin(builder, expectedKeys); } @Test @@ -151,8 +138,8 @@ public class KTableKTableInnerJoinTest { final StreamsBuilder builder = new StreamsBuilder(); final Processor<String, Change<String>> join = new KTableKTableInnerJoin<>( - (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)), - (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)), + (KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())), + (KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())), null ).get(); @@ -168,192 +155,146 @@ public class KTableKTableInnerJoinTest { } private void doTestNotSendingOldValues(final StreamsBuilder builder, - final int[] expectedKeys, - final KTable<Integer, String> table1, - final KTable<Integer, String> table2, - final MockProcessorSupplier<Integer, String> supplier, - final KTable<Integer, String> joined) { + final int[] expectedKeys, + final KTable<Integer, String> table1, + final KTable<Integer, String> table2, + final MockProcessorSupplier<Integer, String> supplier, + final KTable<Integer, String> joined) { - assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); - assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); - assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); - - driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String()); - driver.setTime(0L); - - final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); - - // push two items to the primary stream. the other table is empty - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - driver.flushState(); - - proc.checkAndClearProcessResult(); - - // push two items to the other stream. this should produce two items. - - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - driver.flushState(); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props, 0L)) { - proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); + final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); - // push all four items to the primary stream. this should produce two items. + assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); + assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); + assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); - } - driver.flushState(); - - proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", "1:(XX1+Y1<-null)"); + // push two items to the primary stream. the other table is empty + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); + } + proc.checkAndClearProcessResult(); - // push all items to the other stream. this should produce four items. - for (final int expectedKey : expectedKeys) { - driver.process(topic2, expectedKey, "YY" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", "1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); + // push two items to the other stream. this should produce two items. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce two items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", "1:(XX1+Y1<-null)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + // push all items to the other stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", "1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); - // push two items with null to the other stream as deletes. this should produce two item. + // push all four items to the primary stream. this should produce four items. - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); - // push all four items to the primary stream. this should produce two items. + // push two items with null to the other stream as deletes. this should produce two item. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); + } + proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); + // push all four items to the primary stream. this should produce two items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } - driver.flushState(); - proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } - @SuppressWarnings("unchecked") - private void doTestJoin(final StreamsBuilder builder, - final int[] expectedKeys, - final MockProcessorSupplier<Integer, String> supplier, - final KTable<Integer, String> joined) { + private void doTestJoin(final StreamsBuilder builder, final int[] expectedKeys) { + final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier(); - - driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String()); - driver.setTime(0L); - - final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - - final KTableValueGetter<Integer, String> getter = getterSupplier.get(); - getter.init(driver.context()); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - // push two items to the primary stream. the other table is empty - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - // pass tuple with null key, it will be discarded in join process - driver.process(topic1, null, "SomeVal"); - driver.flushState(); - - processor.checkAndClearProcessResult(); - - // push two items to the other stream. this should produce two items. - - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - // pass tuple with null key, it will be discarded in join process - driver.process(topic2, null, "AnotherVal"); - driver.flushState(); - - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1")); - - // push all four items to the primary stream. this should produce two items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); - } - driver.flushState(); - - processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1"); - checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1")); - - // push all items to the other stream. this should produce four items. - for (final int expectedKey : expectedKeys) { - driver.process(topic2, expectedKey, "YY" + expectedKey); - } - driver.flushState(); - - processor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); - checkJoinedValues(getter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); - - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); - - // push two items with null to the other stream as deletes. this should produce two item. + // push two items to the primary stream. the other table is empty + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); + } + // pass tuple with null key, it will be discarded in join process + driver.pipeInput(recordFactory.create(topic1, null, "SomeVal")); + assertNull(driver.readOutput(output)); - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } - driver.flushState(); + // push two items to the other stream. this should produce two items. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); + } + // pass tuple with null key, it will be discarded in join process + driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal")); + assertOutputKeyValue(driver, 0, "X0+Y0"); + assertOutputKeyValue(driver, 1, "X1+Y1"); + assertNull(driver.readOutput(output)); + + // push all four items to the primary stream. this should produce two items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "XX0+Y0"); + assertOutputKeyValue(driver, 1, "XX1+Y1"); + assertNull(driver.readOutput(output)); - processor.checkAndClearProcessResult("0:null", "1:null"); - checkJoinedValues(getter, kv(0, null), kv(1, null)); + // push all items to the other stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "XX0+YY0"); + assertOutputKeyValue(driver, 1, "XX1+YY1"); + assertOutputKeyValue(driver, 2, "XX2+YY2"); + assertOutputKeyValue(driver, 3, "XX3+YY3"); + assertNull(driver.readOutput(output)); + + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "X0+YY0"); + assertOutputKeyValue(driver, 1, "X1+YY1"); + assertOutputKeyValue(driver, 2, "X2+YY2"); + assertOutputKeyValue(driver, 3, "X3+YY3"); + assertNull(driver.readOutput(output)); + + // push two items with null to the other stream as deletes. this should produce two item. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); + } + assertOutputKeyValue(driver, 0, null); + assertOutputKeyValue(driver, 1, null); + assertNull(driver.readOutput(output)); - // push all four items to the primary stream. this should produce two items. + // push all four items to the primary stream. this should produce two items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + assertOutputKeyValue(driver, 2, "XX2+YY2"); + assertOutputKeyValue(driver, 3, "XX3+YY3"); + assertNull(driver.readOutput(output)); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); + driver.pipeInput(recordFactory.create(topic1, null, "XX" + 1)); + assertNull(driver.readOutput(output)); } - driver.flushState(); - - processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3"); - checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); - - driver.process(topic1, null, "XX" + 1); - checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); - - } - - private KeyValue<Integer, String> kv(final Integer key, final String value) { - return new KeyValue<>(key, value); } - @SafeVarargs - private final void checkJoinedValues(final KTableValueGetter<Integer, String> getter, final KeyValue<Integer, String>... expected) { - for (final KeyValue<Integer, String> kv : expected) { - final String value = getter.get(kv.key); - if (kv.value == null) { - assertNull(value); - } else { - assertEquals(kv.value, value); - } - } + private void assertOutputKeyValue(final TopologyTestDriver driver, + final Integer expectedKey, + final String expectedValue) { + OutputVerifier.compareKeyValue(driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, expectedValue); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java index f5d74b2..609b070 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java @@ -16,12 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.TopologyTestDriverWrapper; import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.Grouped; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -29,20 +31,20 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.streams.test.OutputVerifier; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.MockValueJoiner; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Rule; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.io.File; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; import java.util.Locale; +import java.util.Properties; import java.util.Random; import java.util.Set; @@ -58,20 +60,11 @@ public class KTableKTableLeftJoinTest { final private String topic1 = "topic1"; final private String topic2 = "topic2"; + final private String output = "output"; - final private Serde<Integer> intSerde = Serdes.Integer(); - final private Serde<String> stringSerde = Serdes.String(); - private File stateDir = null; - - @SuppressWarnings("deprecation") - @Rule - public final org.apache.kafka.test.KStreamTestDriver driver = new org.apache.kafka.test.KStreamTestDriver(); - private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde); - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } + private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(Serdes.Integer().serializer(), Serdes.String().serializer()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testJoin() { @@ -82,91 +75,83 @@ public class KTableKTableLeftJoinTest { final KTable<Integer, String> table1 = builder.table(topic1, consumed); final KTable<Integer, String> table2 = builder.table(topic2, consumed); final KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER); - final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>(); - joined.toStream().process(supplier); + joined.toStream().to(output); final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - @SuppressWarnings("unchecked") - final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier(); - - driver.setUp(builder, stateDir); - driver.setTime(0L); - - final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - - final KTableValueGetter<Integer, String> getter = getterSupplier.get(); - getter.init(driver.context()); - - // push two items to the primary stream. the other table is empty + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - // pass tuple with null key, it will be discarded in join process - driver.process(topic1, null, "SomeVal"); - driver.flushState(); - - processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null)); - - // push two items to the other stream. this should produce two items. - - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - // pass tuple with null key, it will be discarded in join process - driver.process(topic2, null, "AnotherVal"); - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); - - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null")); - - // push all items to the other stream. this should produce four items. - for (final int expectedKey : expectedKeys) { - driver.process(topic2, expectedKey, "YY" + expectedKey); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); - - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); - - // push two items with null to the other stream as deletes. this should produce two item. - - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); - - // push all four items to the primary stream. this should produce four items. + // push two items to the primary stream. the other table is empty + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); + } + // pass tuple with null key, it will be discarded in join process + driver.pipeInput(recordFactory.create(topic1, null, "SomeVal")); + assertOutputKeyValue(driver, 0, "X0+null"); + assertOutputKeyValue(driver, 1, "X1+null"); + assertNull(driver.readOutput(output)); + + // push two items to the other stream. this should produce two items. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); + } + // pass tuple with null key, it will be discarded in join process + driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal")); + assertOutputKeyValue(driver, 0, "X0+Y0"); + assertOutputKeyValue(driver, 1, "X1+Y1"); + assertNull(driver.readOutput(output)); + + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "X0+Y0"); + assertOutputKeyValue(driver, 1, "X1+Y1"); + assertOutputKeyValue(driver, 2, "X2+null"); + assertOutputKeyValue(driver, 3, "X3+null"); + assertNull(driver.readOutput(output)); + + // push all items to the other stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "X0+YY0"); + assertOutputKeyValue(driver, 1, "X1+YY1"); + assertOutputKeyValue(driver, 2, "X2+YY2"); + assertOutputKeyValue(driver, 3, "X3+YY3"); + assertNull(driver.readOutput(output)); + + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "X0+YY0"); + assertOutputKeyValue(driver, 1, "X1+YY1"); + assertOutputKeyValue(driver, 2, "X2+YY2"); + assertOutputKeyValue(driver, 3, "X3+YY3"); + assertNull(driver.readOutput(output)); + + // push two items with null to the other stream as deletes. this should produce two item. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); + } + assertOutputKeyValue(driver, 0, "X0+null"); + assertOutputKeyValue(driver, 1, "X1+null"); + assertNull(driver.readOutput(output)); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "XX0+null"); + assertOutputKeyValue(driver, 1, "XX1+null"); + assertOutputKeyValue(driver, 2, "XX2+YY2"); + assertOutputKeyValue(driver, 3, "XX3+YY3"); + assertNull(driver.readOutput(output)); } - driver.flushState(); - processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); - checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); } @Test @@ -185,71 +170,58 @@ public class KTableKTableLeftJoinTest { joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER); supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name); - - driver.setUp(builder, stateDir); - driver.setTime(0L); - - final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); - - assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); - assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); - assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); - - // push two items to the primary stream. the other table is empty - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name); - // push two items to the other stream. this should produce two items. + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); + final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); - // push all four items to the primary stream. this should produce four items. + assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); + assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); + assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); - - // push all items to the other stream. this should produce four items. - for (final int expectedKey : expectedKeys) { - driver.process(topic2, expectedKey, "YY" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + // push two items to the primary stream. the other table is empty + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); - // push all four items to the primary stream. this should produce four items. + // push two items to the other stream. this should produce two items. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); - // push two items with null to the other stream as deletes. this should produce two item. + // push all items to the other stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); - // push all four items to the primary stream. this should produce four items. + // push two items with null to the other stream as deletes. this should produce two item. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); + } + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } - driver.flushState(); - proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); } @Test @@ -270,71 +242,58 @@ public class KTableKTableLeftJoinTest { ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name); - - driver.setUp(builder, stateDir); - driver.setTime(0L); + final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name); - final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); + try (final TopologyTestDriver driver = new TopologyTestDriverWrapper(topology, props)) { - assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); - assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); - assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); + final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); - // push two items to the primary stream. the other table is empty - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); - - // push two items to the other stream. this should produce two items. - - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); + assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); + assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); + assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); - - // push all items to the other stream. this should produce four items. - for (final int expectedKey : expectedKeys) { - driver.process(topic2, expectedKey, "YY" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); + // push two items to the primary stream. the other table is empty + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); - // push all four items to the primary stream. this should produce four items. + // push two items to the other stream. this should produce two items. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); - // push two items with null to the other stream as deletes. this should produce two item. + // push all items to the other stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); - // push all four items to the primary stream. this should produce four items. + // push two items with null to the other stream as deletes. this should produce two item. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); + } + proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } - driver.flushState(); - proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); } /** @@ -382,20 +341,22 @@ public class KTableKTableLeftJoinTest { .leftJoin(eight, MockValueJoiner.TOSTRING_JOINER) .mapValues(mapper); - driver.setUp(builder, stateDir, 250); - - final String[] values = { - "a", "AA", "BBB", "CCCC", "DD", "EEEEEEEE", "F", "GGGGGGGGGGGGGGG", "HHH", "IIIIIIIIII", - "J", "KK", "LLLL", "MMMMMMMMMMMMMMMMMMMMMM", "NNNNN", "O", "P", "QQQQQ", "R", "SSSS", - "T", "UU", "VVVVVVVVVVVVVVVVVVV" - }; - - final Random random = new Random(); - for (int i = 0; i < 1000; i++) { - for (final String input : inputs) { - final Long key = (long) random.nextInt(1000); - final String value = values[random.nextInt(values.length)]; - driver.process(input, key, value); + final ConsumerRecordFactory<Long, String> factory = new ConsumerRecordFactory<>(Serdes.Long().serializer(), Serdes.String().serializer()); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { + + final String[] values = { + "a", "AA", "BBB", "CCCC", "DD", "EEEEEEEE", "F", "GGGGGGGGGGGGGGG", "HHH", "IIIIIIIIII", + "J", "KK", "LLLL", "MMMMMMMMMMMMMMMMMMMMMM", "NNNNN", "O", "P", "QQQQQ", "R", "SSSS", + "T", "UU", "VVVVVVVVVVVVVVVVVVV" + }; + + final Random random = new Random(); + for (int i = 0; i < 1000; i++) { + for (final String input : inputs) { + final Long key = (long) random.nextInt(1000); + final String value = values[random.nextInt(values.length)]; + driver.pipeInput(factory.create(input, key, value)); + } } } } @@ -406,8 +367,8 @@ public class KTableKTableLeftJoinTest { @SuppressWarnings("unchecked") final Processor<String, Change<String>> join = new KTableKTableLeftJoin<>( - (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)), - (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)), + (KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())), + (KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())), null ).get(); @@ -422,19 +383,9 @@ public class KTableKTableLeftJoinTest { assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]")); } - private KeyValue<Integer, String> kv(final Integer key, final String value) { - return new KeyValue<>(key, value); - } - - @SafeVarargs - private final void checkJoinedValues(final KTableValueGetter<Integer, String> getter, final KeyValue<Integer, String>... expected) { - for (final KeyValue<Integer, String> kv : expected) { - final String value = getter.get(kv.key); - if (kv.value == null) { - assertNull(value); - } else { - assertEquals(kv.value, value); - } - } + private void assertOutputKeyValue(final TopologyTestDriver driver, + final Integer expectedKey, + final String expectedValue) { + OutputVerifier.compareKeyValue(driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, expectedValue); } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java index 1500ba3..092b5a1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java @@ -16,28 +16,28 @@ */ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; -import org.apache.kafka.streams.kstream.Consumed; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.TopologyWrapper; +import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.processor.MockProcessorContext; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender; +import org.apache.kafka.streams.test.ConsumerRecordFactory; +import org.apache.kafka.streams.test.OutputVerifier; import org.apache.kafka.test.MockProcessor; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; -import org.apache.kafka.test.TestUtils; -import org.junit.Before; -import org.junit.Rule; +import org.apache.kafka.test.StreamsTestUtils; import org.junit.Test; -import java.io.File; import java.util.Arrays; import java.util.Collection; import java.util.HashSet; +import java.util.Properties; import java.util.Set; import static org.apache.kafka.test.StreamsTestUtils.getMetricByName; @@ -52,20 +52,11 @@ public class KTableKTableOuterJoinTest { final private String topic1 = "topic1"; final private String topic2 = "topic2"; + final private String output = "output"; - final private Serde<Integer> intSerde = Serdes.Integer(); - final private Serde<String> stringSerde = Serdes.String(); - private File stateDir = null; - - @SuppressWarnings("deprecation") - @Rule - public final org.apache.kafka.test.KStreamTestDriver driver = new org.apache.kafka.test.KStreamTestDriver(); - private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde); - - @Before - public void setUp() { - stateDir = TestUtils.tempDirectory("kafka-test"); - } + private final Consumed<Integer, String> consumed = Consumed.with(Serdes.Integer(), Serdes.String()); + private final ConsumerRecordFactory<Integer, String> recordFactory = new ConsumerRecordFactory<>(Serdes.Integer().serializer(), Serdes.String().serializer()); + private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String()); @Test public void testJoin() { @@ -76,103 +67,95 @@ public class KTableKTableOuterJoinTest { final KTable<Integer, String> table1; final KTable<Integer, String> table2; final KTable<Integer, String> joined; - final MockProcessorSupplier<Integer, String> supplier; - supplier = new MockProcessorSupplier<>(); table1 = builder.table(topic1, consumed); table2 = builder.table(topic2, consumed); joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER); - joined.toStream().process(supplier); + joined.toStream().to(output); final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups(); assertEquals(1, copartitionGroups.size()); assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); - @SuppressWarnings("unchecked") - final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer, String, String>) joined).valueGetterSupplier(); - - driver.setUp(builder, stateDir); - - final MockProcessor<Integer, String> processor = supplier.theCapturedProcessor(); - - final KTableValueGetter<Integer, String> getter = getterSupplier.get(); - getter.init(driver.context()); - - // push two items to the primary stream. the other table is empty - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - // pass tuple with null key, it will be discarded in join process - driver.process(topic1, null, "SomeVal"); - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, null), kv(3, null)); - - // push two items to the other stream. this should produce two items. - - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - // pass tuple with null key, it will be discarded in join process - driver.process(topic2, null, "AnotherVal"); - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null)); - - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); - checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, "X2+null"), kv(3, "X3+null")); - - // push all items to the other stream. this should produce four items. - for (final int expectedKey : expectedKeys) { - driver.process(topic2, expectedKey, "YY" + expectedKey); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); - - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3"); - checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); - - // push two items with null to the other stream as deletes. this should produce two item. - - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:X0+null", "1:X1+null"); - checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, "X2+YY2"), kv(3, "X3+YY3")); + try (final TopologyTestDriver driver = new TopologyTestDriver(builder.build(), props)) { - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); - } - driver.flushState(); - processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3"); - checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, "XX2+YY2"), kv(3, "XX3+YY3")); - - // push middle two items to the primary stream with null. this should produce two items. + // push two items to the primary stream. the other table is empty + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); + } + // pass tuple with null key, it will be discarded in join process + driver.pipeInput(recordFactory.create(topic1, null, "SomeVal")); + assertOutputKeyValue(driver, 0, "X0+null"); + assertOutputKeyValue(driver, 1, "X1+null"); + assertNull(driver.readOutput(output)); + + // push two items to the other stream. this should produce two items. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); + } + // pass tuple with null key, it will be discarded in join process + driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal")); + assertOutputKeyValue(driver, 0, "X0+Y0"); + assertOutputKeyValue(driver, 1, "X1+Y1"); + assertNull(driver.readOutput(output)); + + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "X0+Y0"); + assertOutputKeyValue(driver, 1, "X1+Y1"); + assertOutputKeyValue(driver, 2, "X2+null"); + assertOutputKeyValue(driver, 3, "X3+null"); + assertNull(driver.readOutput(output)); + + // push all items to the other stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "X0+YY0"); + assertOutputKeyValue(driver, 1, "X1+YY1"); + assertOutputKeyValue(driver, 2, "X2+YY2"); + assertOutputKeyValue(driver, 3, "X3+YY3"); + assertNull(driver.readOutput(output)); + + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "X0+YY0"); + assertOutputKeyValue(driver, 1, "X1+YY1"); + assertOutputKeyValue(driver, 2, "X2+YY2"); + assertOutputKeyValue(driver, 3, "X3+YY3"); + assertNull(driver.readOutput(output)); + + // push two items with null to the other stream as deletes. this should produce two item. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); + } + assertOutputKeyValue(driver, 0, "X0+null"); + assertOutputKeyValue(driver, 1, "X1+null"); + assertNull(driver.readOutput(output)); - for (int i = 1; i < 3; i++) { - driver.process(topic1, expectedKeys[i], null); + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + assertOutputKeyValue(driver, 0, "XX0+null"); + assertOutputKeyValue(driver, 1, "XX1+null"); + assertOutputKeyValue(driver, 2, "XX2+YY2"); + assertOutputKeyValue(driver, 3, "XX3+YY3"); + assertNull(driver.readOutput(output)); + + // push middle two items to the primary stream with null. this should produce two items. + for (int i = 1; i < 3; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], null)); + } + assertOutputKeyValue(driver, 1, null); + assertOutputKeyValue(driver, 2, "null+YY2"); + assertNull(driver.readOutput(output)); } - driver.flushState(); - processor.checkAndClearProcessResult("1:null", "2:null+YY2"); - checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, "null+YY2"), kv(3, "XX3+YY3")); } @Test @@ -191,78 +174,63 @@ public class KTableKTableOuterJoinTest { joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER); supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name); + final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { - driver.setUp(builder, stateDir); - - final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); - - assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); - assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); - assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); - - // push two items to the primary stream. the other table is empty - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); - // push two items to the other stream. this should produce two items. + assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); + assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); + assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); - - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); - - // push all items to the other stream. this should produce four items. - for (final int expectedKey : expectedKeys) { - driver.process(topic2, expectedKey, "YY" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); - - // push all four items to the primary stream. this should produce four items. + // push two items to the primary stream. the other table is empty + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); + // push two items to the other stream. this should produce two items. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)"); - // push two items with null to the other stream as deletes. this should produce two item. + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)"); - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + // push all items to the other stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); + // push two items with null to the other stream as deletes. this should produce two item. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); + } + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); - // push middle two items to the primary stream with null. this should produce two items. + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(XX0+null<-null)", "1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)"); - for (int i = 1; i < 3; i++) { - driver.process(topic1, expectedKeys[i], null); + // push middle two items to the primary stream with null. this should produce two items. + for (int i = 1; i < 3; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], null)); + } + proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)"); } - driver.flushState(); - proc.checkAndClearProcessResult("1:(null<-null)", "2:(null+YY2<-null)"); } @Test @@ -283,78 +251,64 @@ public class KTableKTableOuterJoinTest { ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues(); supplier = new MockProcessorSupplier<>(); - builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name); - - driver.setUp(builder, stateDir); - - final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); + final Topology topology = builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) joined).name); - assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); - assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); - assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); + try (final TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { - // push two items to the primary stream. the other table is empty - - for (int i = 0; i < 2; i++) { - driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); + final MockProcessor<Integer, String> proc = supplier.theCapturedProcessor(); - // push two items to the other stream. this should produce two items. + assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled()); + assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled()); + assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled()); - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); - - // push all four items to the primary stream. this should produce four items. - - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); - - // push all items to the other stream. this should produce four items. - for (final int expectedKey : expectedKeys) { - driver.process(topic2, expectedKey, "YY" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); - - // push all four items to the primary stream. this should produce four items. + // push two items to the primary stream. the other table is empty + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], "X" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+null<-null)", "1:(X1+null<-null)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "X" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); + // push two items to the other stream. this should produce two items. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], "Y" + expectedKeys[i])); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", "1:(X1+Y1<-X1+null)"); - // push two items with null to the other stream as deletes. this should produce two item. + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)"); - for (int i = 0; i < 2; i++) { - driver.process(topic2, expectedKeys[i], null); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); + // push all items to the other stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic2, expectedKey, "YY" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)"); - // push all four items to the primary stream. this should produce four items. + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)"); - for (final int expectedKey : expectedKeys) { - driver.process(topic1, expectedKey, "XX" + expectedKey); - } - driver.flushState(); - proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); + // push two items with null to the other stream as deletes. this should produce two item. + for (int i = 0; i < 2; i++) { + driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], null)); + } + proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", "1:(X1+null<-X1+YY1)"); - // push middle two items to the primary stream with null. this should produce two items. + // push all four items to the primary stream. this should produce four items. + for (final int expectedKey : expectedKeys) { + driver.pipeInput(recordFactory.create(topic1, expectedKey, "XX" + expectedKey)); + } + proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", "1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)"); - for (int i = 1; i < 3; i++) { - driver.process(topic1, expectedKeys[i], null); + // push middle two items to the primary stream with null. this should produce two items. + for (int i = 1; i < 3; i++) { + driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], null)); + } + proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)"); } - driver.flushState(); - proc.checkAndClearProcessResult("1:(null<-XX1+null)", "2:(null+YY2<-XX2+YY2)"); } @Test @@ -363,8 +317,8 @@ public class KTableKTableOuterJoinTest { @SuppressWarnings("unchecked") final Processor<String, Change<String>> join = new KTableKTableOuterJoin<>( - (KTableImpl<String, String, String>) builder.table("left", Consumed.with(stringSerde, stringSerde)), - (KTableImpl<String, String, String>) builder.table("right", Consumed.with(stringSerde, stringSerde)), + (KTableImpl<String, String, String>) builder.table("left", Consumed.with(Serdes.String(), Serdes.String())), + (KTableImpl<String, String, String>) builder.table("right", Consumed.with(Serdes.String(), Serdes.String())), null ).get(); @@ -379,19 +333,10 @@ public class KTableKTableOuterJoinTest { assertThat(appender.getMessages(), hasItem("Skipping record due to null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]")); } - private KeyValue<Integer, String> kv(final Integer key, final String value) { - return new KeyValue<>(key, value); + private void assertOutputKeyValue(final TopologyTestDriver driver, + final Integer expectedKey, + final String expectedValue) { + OutputVerifier.compareKeyValue(driver.readOutput(output, Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, expectedValue); } - @SafeVarargs - private final void checkJoinedValues(final KTableValueGetter<Integer, String> getter, final KeyValue<Integer, String>... expected) { - for (final KeyValue<Integer, String> kv : expected) { - final String value = getter.get(kv.key); - if (kv.value == null) { - assertNull(value); - } else { - assertEquals(kv.value, value); - } - } - } }