This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c1918a2  KAFKA-6474: Rewrite tests to use new public 
TopologyTestDriver [part 4] (#5433)
c1918a2 is described below

commit c1918a2b9a9ba282efe92ddb867abef3d6d9b98c
Author: Filipe Agapito <filipe.agap...@gmail.com>
AuthorDate: Thu Feb 14 23:06:47 2019 +0000

    KAFKA-6474: Rewrite tests to use new public TopologyTestDriver [part 4] 
(#5433)
    
    Reviewer: John Roesler <j...@confluent.io>, Matthias J. Sax 
<matth...@confluent.io>
---
 .../internals/KTableKTableInnerJoinTest.java       | 319 +++++++---------
 .../internals/KTableKTableLeftJoinTest.java        | 417 +++++++++-----------
 .../internals/KTableKTableOuterJoinTest.java       | 425 +++++++++------------
 3 files changed, 499 insertions(+), 662 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 63ed53f..5d1f5c3 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -16,31 +16,30 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -54,22 +53,12 @@ public class KTableKTableInnerJoinTest {
 
     private final String topic1 = "topic1";
     private final String topic2 = "topic2";
+    final private String output = "output";
 
-    private final Serde<Integer> intSerde = Serdes.Integer();
-    private final Serde<String> stringSerde = Serdes.String();
-    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, 
stringSerde);
-    private final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> 
materialized = Materialized.with(intSerde, stringSerde);
-
-    private File stateDir = null;
-
-    @SuppressWarnings("deprecation")
-    @Rule
-    public final org.apache.kafka.test.KStreamTestDriver driver = new 
org.apache.kafka.test.KStreamTestDriver();
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+    private final Materialized<Integer, String, KeyValueStore<Bytes, byte[]>> 
materialized = Materialized.with(Serdes.Integer(), Serdes.String());
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(Serdes.Integer().serializer(), 
Serdes.String().serializer());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testJoin() {
@@ -80,13 +69,12 @@ public class KTableKTableInnerJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
-        joined.toStream().process(supplier);
+        joined.toStream().to(output);
 
-        doTestJoin(builder, expectedKeys, supplier, joined);
+        doTestJoin(builder, expectedKeys);
     }
 
     @Test
@@ -98,13 +86,12 @@ public class KTableKTableInnerJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> table3;
-        final MockProcessorSupplier<Integer, String> processor = new 
MockProcessorSupplier<>();
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         table3 = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, 
materialized);
-        table3.toStream().process(processor);
+        table3.toStream().to(output);
 
-        doTestJoin(builder, expectedKeys, processor, table3);
+        doTestJoin(builder, expectedKeys);
     }
 
     @Test
@@ -151,8 +138,8 @@ public class KTableKTableInnerJoinTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final Processor<String, Change<String>> join = new 
KTableKTableInnerJoin<>(
-            (KTableImpl<String, String, String>) builder.table("left", 
Consumed.with(stringSerde, stringSerde)),
-            (KTableImpl<String, String, String>) builder.table("right", 
Consumed.with(stringSerde, stringSerde)),
+            (KTableImpl<String, String, String>) builder.table("left", 
Consumed.with(Serdes.String(), Serdes.String())),
+            (KTableImpl<String, String, String>) builder.table("right", 
Consumed.with(Serdes.String(), Serdes.String())),
             null
         ).get();
 
@@ -168,192 +155,146 @@ public class KTableKTableInnerJoinTest {
     }
 
     private void doTestNotSendingOldValues(final StreamsBuilder builder,
-                                           final int[] expectedKeys,
-                                           final KTable<Integer, String> 
table1,
-                                           final KTable<Integer, String> 
table2,
-                                           final 
MockProcessorSupplier<Integer, String> supplier,
-                                           final KTable<Integer, String> 
joined) {
+                                        final int[] expectedKeys,
+                                        final KTable<Integer, String> table1,
+                                        final KTable<Integer, String> table2,
+                                        final MockProcessorSupplier<Integer, 
String> supplier,
+                                        final KTable<Integer, String> joined) {
 
-        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-
-        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
-        driver.setTime(0L);
-
-        final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
-
-        // push two items to the primary stream. the other table is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        driver.flushState();
-
-        proc.checkAndClearProcessResult();
-
-        // push two items to the other stream. this should produce two items.
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        driver.flushState();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props, 0L)) {
 
-        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
 
-        // push all four items to the primary stream. this should produce two 
items.
+            assertFalse(((KTableImpl<?, ?, ?>) 
table1).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) 
table2).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) 
joined).sendingOldValueEnabled());
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
-
-        proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", 
"1:(XX1+Y1<-null)");
+            // push two items to the primary stream. the other table is empty
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
"X" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult();
 
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", 
"1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+            // push two items to the other stream. this should produce two 
items.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
"Y" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", 
"1:(X1+Y1<-null)");
 
-        // push all four items to the primary stream. this should produce four 
items.
+            // push all four items to the primary stream. this should produce 
two items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", 
"1:(XX1+Y1<-null)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            // push all items to the other stream. this should produce four 
items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, 
"YY" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", 
"1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
-        // push two items with null to the other stream as deletes. this 
should produce two item.
+            // push all four items to the primary stream. this should produce 
four items.
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-        // push all four items to the primary stream. this should produce two 
items.
+            // push two items with null to the other stream as deletes. this 
should produce two item.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
null));
+            }
+            proc.checkAndClearProcessResult("0:(null<-null)", 
"1:(null<-null)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            // push all four items to the primary stream. this should produce 
two items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", 
"3:(XX3+YY3<-null)");
         }
-        driver.flushState();
-        proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", 
"3:(XX3+YY3<-null)");
     }
 
-    @SuppressWarnings("unchecked")
-    private void doTestJoin(final StreamsBuilder builder,
-                            final int[] expectedKeys,
-                            final MockProcessorSupplier<Integer, String> 
supplier,
-                            final KTable<Integer, String> joined) {
+    private void doTestJoin(final StreamsBuilder builder, final int[] 
expectedKeys) {
+
         final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
 
-        final KTableValueGetterSupplier<Integer, String> getterSupplier = 
((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
-
-        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
-        driver.setTime(0L);
-
-        final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
-
-        final KTableValueGetter<Integer, String> getter = getterSupplier.get();
-        getter.init(driver.context());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
 
-        // push two items to the primary stream. the other table is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        // pass tuple with null key, it will be discarded in join process
-        driver.process(topic1, null, "SomeVal");
-        driver.flushState();
-
-        processor.checkAndClearProcessResult();
-
-        // push two items to the other stream. this should produce two items.
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        // pass tuple with null key, it will be discarded in join process
-        driver.process(topic2, null, "AnotherVal");
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"));
-
-        // push all four items to the primary stream. this should produce two 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
-        checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1"));
-
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", 
"2:XX2+YY2", "3:XX3+YY3");
-        checkJoinedValues(getter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, 
"XX2+YY2"), kv(3, "XX3+YY3"));
-
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
-        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
-
-        // push two items with null to the other stream as deletes. this 
should produce two item.
+            // push two items to the primary stream. the other table is empty
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
"X" + expectedKeys[i]));
+            }
+            // pass tuple with null key, it will be discarded in join process
+            driver.pipeInput(recordFactory.create(topic1, null, "SomeVal"));
+            assertNull(driver.readOutput(output));
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
+            // push two items to the other stream. this should produce two 
items.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
"Y" + expectedKeys[i]));
+            }
+            // pass tuple with null key, it will be discarded in join process
+            driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal"));
+            assertOutputKeyValue(driver, 0, "X0+Y0");
+            assertOutputKeyValue(driver, 1, "X1+Y1");
+            assertNull(driver.readOutput(output));
+
+            // push all four items to the primary stream. this should produce 
two items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "XX0+Y0");
+            assertOutputKeyValue(driver, 1, "XX1+Y1");
+            assertNull(driver.readOutput(output));
 
-        processor.checkAndClearProcessResult("0:null", "1:null");
-        checkJoinedValues(getter, kv(0, null), kv(1, null));
+            // push all items to the other stream. this should produce four 
items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, 
"YY" + expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "XX0+YY0");
+            assertOutputKeyValue(driver, 1, "XX1+YY1");
+            assertOutputKeyValue(driver, 2, "XX2+YY2");
+            assertOutputKeyValue(driver, 3, "XX3+YY3");
+            assertNull(driver.readOutput(output));
+
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "X0+YY0");
+            assertOutputKeyValue(driver, 1, "X1+YY1");
+            assertOutputKeyValue(driver, 2, "X2+YY2");
+            assertOutputKeyValue(driver, 3, "X3+YY3");
+            assertNull(driver.readOutput(output));
+
+            // push two items with null to the other stream as deletes. this 
should produce two item.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
null));
+            }
+            assertOutputKeyValue(driver, 0, null);
+            assertOutputKeyValue(driver, 1, null);
+            assertNull(driver.readOutput(output));
 
-        // push all four items to the primary stream. this should produce two 
items.
+            // push all four items to the primary stream. this should produce 
two items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            assertOutputKeyValue(driver, 2, "XX2+YY2");
+            assertOutputKeyValue(driver, 3, "XX3+YY3");
+            assertNull(driver.readOutput(output));
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            driver.pipeInput(recordFactory.create(topic1, null, "XX" + 1));
+            assertNull(driver.readOutput(output));
         }
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
-        checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
-
-        driver.process(topic1, null, "XX" + 1);
-        checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
-
-    }
-
 
-    private KeyValue<Integer, String> kv(final Integer key, final String 
value) {
-        return new KeyValue<>(key, value);
     }
 
-    @SafeVarargs
-    private final void checkJoinedValues(final KTableValueGetter<Integer, 
String> getter, final KeyValue<Integer, String>... expected) {
-        for (final KeyValue<Integer, String> kv : expected) {
-            final String value = getter.get(kv.key);
-            if (kv.value == null) {
-                assertNull(value);
-            } else {
-                assertEquals(kv.value, value);
-            }
-        }
+    private void assertOutputKeyValue(final TopologyTestDriver driver,
+                                      final Integer expectedKey,
+                                      final String expectedValue) {
+        OutputVerifier.compareKeyValue(driver.readOutput(output, 
Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, 
expectedValue);
     }
 
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index f5d74b2..609b070 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -16,12 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyTestDriverWrapper;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Grouped;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -29,20 +31,20 @@ import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockReducer;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Locale;
+import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 
@@ -58,20 +60,11 @@ public class KTableKTableLeftJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
+    final private String output = "output";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    private File stateDir = null;
-
-    @SuppressWarnings("deprecation")
-    @Rule
-    public final org.apache.kafka.test.KStreamTestDriver driver = new 
org.apache.kafka.test.KStreamTestDriver();
-    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, 
stringSerde);
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(Serdes.Integer().serializer(), 
Serdes.String().serializer());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testJoin() {
@@ -82,91 +75,83 @@ public class KTableKTableLeftJoinTest {
         final KTable<Integer, String> table1 = builder.table(topic1, consumed);
         final KTable<Integer, String> table2 = builder.table(topic2, consumed);
         final KTable<Integer, String> joined = table1.leftJoin(table2, 
MockValueJoiner.TOSTRING_JOINER);
-        final MockProcessorSupplier<Integer, String> supplier = new 
MockProcessorSupplier<>();
-        joined.toStream().process(supplier);
+        joined.toStream().to(output);
 
         final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
 
-        @SuppressWarnings("unchecked")
-        final KTableValueGetterSupplier<Integer, String> getterSupplier = 
((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
-
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
-
-        final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
-
-        final KTableValueGetter<Integer, String> getter = getterSupplier.get();
-        getter.init(driver.context());
-
-        // push two items to the primary stream. the other table is empty
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        // pass tuple with null key, it will be discarded in join process
-        driver.process(topic1, null, "SomeVal");
-        driver.flushState();
-
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, 
null), kv(3, null));
-
-        // push two items to the other stream. this should produce two items.
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        // pass tuple with null key, it will be discarded in join process
-        driver.process(topic2, null, "AnotherVal");
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), 
kv(3, null));
-
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", 
"2:X2+null", "3:X3+null");
-        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, 
"X2+null"), kv(3, "X3+null"));
-
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
-        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
-
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
-        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
-
-        // push two items with null to the other stream as deletes. this 
should produce two item.
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
-
-        // push all four items to the primary stream. this should produce four 
items.
+            // push two items to the primary stream. the other table is empty
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
"X" + expectedKeys[i]));
+            }
+            // pass tuple with null key, it will be discarded in join process
+            driver.pipeInput(recordFactory.create(topic1, null, "SomeVal"));
+            assertOutputKeyValue(driver, 0, "X0+null");
+            assertOutputKeyValue(driver, 1, "X1+null");
+            assertNull(driver.readOutput(output));
+
+            // push two items to the other stream. this should produce two 
items.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
"Y" + expectedKeys[i]));
+            }
+            // pass tuple with null key, it will be discarded in join process
+            driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal"));
+            assertOutputKeyValue(driver, 0, "X0+Y0");
+            assertOutputKeyValue(driver, 1, "X1+Y1");
+            assertNull(driver.readOutput(output));
+
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "X0+Y0");
+            assertOutputKeyValue(driver, 1, "X1+Y1");
+            assertOutputKeyValue(driver, 2, "X2+null");
+            assertOutputKeyValue(driver, 3, "X3+null");
+            assertNull(driver.readOutput(output));
+
+            // push all items to the other stream. this should produce four 
items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, 
"YY" + expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "X0+YY0");
+            assertOutputKeyValue(driver, 1, "X1+YY1");
+            assertOutputKeyValue(driver, 2, "X2+YY2");
+            assertOutputKeyValue(driver, 3, "X3+YY3");
+            assertNull(driver.readOutput(output));
+
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "X0+YY0");
+            assertOutputKeyValue(driver, 1, "X1+YY1");
+            assertOutputKeyValue(driver, 2, "X2+YY2");
+            assertOutputKeyValue(driver, 3, "X3+YY3");
+            assertNull(driver.readOutput(output));
+
+            // push two items with null to the other stream as deletes. this 
should produce two item.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
null));
+            }
+            assertOutputKeyValue(driver, 0, "X0+null");
+            assertOutputKeyValue(driver, 1, "X1+null");
+            assertNull(driver.readOutput(output));
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "XX0+null");
+            assertOutputKeyValue(driver, 1, "XX1+null");
+            assertOutputKeyValue(driver, 2, "XX2+YY2");
+            assertOutputKeyValue(driver, 3, "XX3+YY3");
+            assertNull(driver.readOutput(output));
         }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+YY2", "3:XX3+YY3");
-        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, 
"XX2+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
@@ -185,71 +170,58 @@ public class KTableKTableLeftJoinTest {
         joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         supplier = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) 
joined).name);
-
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
-
-        final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
-
-        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-
-        // push two items to the primary stream. the other table is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
+        final Topology topology = builder.build().addProcessor("proc", 
supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
-        // push two items to the other stream. this should produce two items.
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, props)) {
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
+            final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
 
-        // push all four items to the primary stream. this should produce four 
items.
+            assertTrue(((KTableImpl<?, ?, ?>) 
table1).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) 
table2).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) 
joined).sendingOldValueEnabled());
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", 
"2:(X2+null<-null)", "3:(X3+null<-null)");
-
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            // push two items to the primary stream. the other table is empty
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
"X" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
-        // push all four items to the primary stream. this should produce four 
items.
+            // push two items to the other stream. this should produce two 
items.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
"Y" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", 
"1:(X1+Y1<-null)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", 
"1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-        // push two items with null to the other stream as deletes. this 
should produce two item.
+            // push all items to the other stream. this should produce four 
items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, 
"YY" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-        // push all four items to the primary stream. this should produce four 
items.
+            // push two items with null to the other stream as deletes. this 
should produce two item.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
null));
+            }
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(XX0+null<-null)", 
"1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
         }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(XX0+null<-null)", 
"1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
     @Test
@@ -270,71 +242,58 @@ public class KTableKTableLeftJoinTest {
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
         supplier = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) 
joined).name);
-
-        driver.setUp(builder, stateDir);
-        driver.setTime(0L);
+        final Topology topology = builder.build().addProcessor("proc", 
supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
-        final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
+        try (final TopologyTestDriver driver = new 
TopologyTestDriverWrapper(topology, props)) {
 
-        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+            final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
 
-        // push two items to the primary stream. the other table is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
-
-        // push two items to the other stream. this should produce two items.
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", 
"1:(X1+Y1<-X1+null)");
+            assertTrue(((KTableImpl<?, ?, ?>) 
table1).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) 
table2).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) 
joined).sendingOldValueEnabled());
 
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", 
"1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
-
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", 
"1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
+            // push two items to the primary stream. the other table is empty
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
"X" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
-        // push all four items to the primary stream. this should produce four 
items.
+            // push two items to the other stream. this should produce two 
items.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
"Y" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", 
"1:(X1+Y1<-X1+null)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", 
"1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", 
"1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-        // push two items with null to the other stream as deletes. this 
should produce two item.
+            // push all items to the other stream. this should produce four 
items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, 
"YY" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", 
"1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", 
"1:(X1+null<-X1+YY1)");
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", 
"1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
-        // push all four items to the primary stream. this should produce four 
items.
+            // push two items with null to the other stream as deletes. this 
should produce two item.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
null));
+            }
+            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", 
"1:(X1+null<-X1+YY1)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", 
"1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
         }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", 
"1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 
     /**
@@ -382,20 +341,22 @@ public class KTableKTableLeftJoinTest {
             .leftJoin(eight, MockValueJoiner.TOSTRING_JOINER)
             .mapValues(mapper);
 
-        driver.setUp(builder, stateDir, 250);
-
-        final String[] values = {
-            "a", "AA", "BBB", "CCCC", "DD", "EEEEEEEE", "F", 
"GGGGGGGGGGGGGGG", "HHH", "IIIIIIIIII",
-            "J", "KK", "LLLL", "MMMMMMMMMMMMMMMMMMMMMM", "NNNNN", "O", "P", 
"QQQQQ", "R", "SSSS",
-            "T", "UU", "VVVVVVVVVVVVVVVVVVV"
-        };
-
-        final Random random = new Random();
-        for (int i = 0; i < 1000; i++) {
-            for (final String input : inputs) {
-                final Long key = (long) random.nextInt(1000);
-                final String value = values[random.nextInt(values.length)];
-                driver.process(input, key, value);
+        final ConsumerRecordFactory<Long, String> factory = new 
ConsumerRecordFactory<>(Serdes.Long().serializer(), 
Serdes.String().serializer());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
+
+            final String[] values = {
+                "a", "AA", "BBB", "CCCC", "DD", "EEEEEEEE", "F", 
"GGGGGGGGGGGGGGG", "HHH", "IIIIIIIIII",
+                "J", "KK", "LLLL", "MMMMMMMMMMMMMMMMMMMMMM", "NNNNN", "O", 
"P", "QQQQQ", "R", "SSSS",
+                "T", "UU", "VVVVVVVVVVVVVVVVVVV"
+            };
+
+            final Random random = new Random();
+            for (int i = 0; i < 1000; i++) {
+                for (final String input : inputs) {
+                    final Long key = (long) random.nextInt(1000);
+                    final String value = values[random.nextInt(values.length)];
+                    driver.pipeInput(factory.create(input, key, value));
+                }
             }
         }
     }
@@ -406,8 +367,8 @@ public class KTableKTableLeftJoinTest {
 
         @SuppressWarnings("unchecked")
         final Processor<String, Change<String>> join = new 
KTableKTableLeftJoin<>(
-            (KTableImpl<String, String, String>) builder.table("left", 
Consumed.with(stringSerde, stringSerde)),
-            (KTableImpl<String, String, String>) builder.table("right", 
Consumed.with(stringSerde, stringSerde)),
+            (KTableImpl<String, String, String>) builder.table("left", 
Consumed.with(Serdes.String(), Serdes.String())),
+            (KTableImpl<String, String, String>) builder.table("right", 
Consumed.with(Serdes.String(), Serdes.String())),
             null
         ).get();
 
@@ -422,19 +383,9 @@ public class KTableKTableLeftJoinTest {
         assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
     }
 
-    private KeyValue<Integer, String> kv(final Integer key, final String 
value) {
-        return new KeyValue<>(key, value);
-    }
-
-    @SafeVarargs
-    private final void checkJoinedValues(final KTableValueGetter<Integer, 
String> getter, final KeyValue<Integer, String>... expected) {
-        for (final KeyValue<Integer, String> kv : expected) {
-            final String value = getter.get(kv.key);
-            if (kv.value == null) {
-                assertNull(value);
-            } else {
-                assertEquals(kv.value, value);
-            }
-        }
+    private void assertOutputKeyValue(final TopologyTestDriver driver,
+                                      final Integer expectedKey,
+                                      final String expectedValue) {
+        OutputVerifier.compareKeyValue(driver.readOutput(output, 
Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, 
expectedValue);
     }
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 1500ba3..092b5a1 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -16,28 +16,28 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.kstream.Consumed;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
 import 
org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
+import org.apache.kafka.streams.test.ConsumerRecordFactory;
+import org.apache.kafka.streams.test.OutputVerifier;
 import org.apache.kafka.test.MockProcessor;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.Before;
-import org.junit.Rule;
+import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
-import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashSet;
+import java.util.Properties;
 import java.util.Set;
 
 import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
@@ -52,20 +52,11 @@ public class KTableKTableOuterJoinTest {
 
     final private String topic1 = "topic1";
     final private String topic2 = "topic2";
+    final private String output = "output";
 
-    final private Serde<Integer> intSerde = Serdes.Integer();
-    final private Serde<String> stringSerde = Serdes.String();
-    private File stateDir = null;
-
-    @SuppressWarnings("deprecation")
-    @Rule
-    public final org.apache.kafka.test.KStreamTestDriver driver = new 
org.apache.kafka.test.KStreamTestDriver();
-    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, 
stringSerde);
-
-    @Before
-    public void setUp() {
-        stateDir = TestUtils.tempDirectory("kafka-test");
-    }
+    private final Consumed<Integer, String> consumed = 
Consumed.with(Serdes.Integer(), Serdes.String());
+    private final ConsumerRecordFactory<Integer, String> recordFactory = new 
ConsumerRecordFactory<>(Serdes.Integer().serializer(), 
Serdes.String().serializer());
+    private final Properties props = 
StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
 
     @Test
     public void testJoin() {
@@ -76,103 +67,95 @@ public class KTableKTableOuterJoinTest {
         final KTable<Integer, String> table1;
         final KTable<Integer, String> table2;
         final KTable<Integer, String> joined;
-        final MockProcessorSupplier<Integer, String> supplier;
 
-        supplier = new MockProcessorSupplier<>();
         table1 = builder.table(topic1, consumed);
         table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
-        joined.toStream().process(supplier);
+        joined.toStream().to(output);
 
         final Collection<Set<String>> copartitionGroups = 
TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), 
copartitionGroups.iterator().next());
 
-        @SuppressWarnings("unchecked")
-        final KTableValueGetterSupplier<Integer, String> getterSupplier = 
((KTableImpl<Integer, String, String>) joined).valueGetterSupplier();
-
-        driver.setUp(builder, stateDir);
-
-        final MockProcessor<Integer, String> processor = 
supplier.theCapturedProcessor();
-
-        final KTableValueGetter<Integer, String> getter = getterSupplier.get();
-        getter.init(driver.context());
-
-        // push two items to the primary stream. the other table is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        // pass tuple with null key, it will be discarded in join process
-        driver.process(topic1, null, "SomeVal");
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, 
null), kv(3, null));
-
-        // push two items to the other stream. this should produce two items.
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        // pass tuple with null key, it will be discarded in join process
-        driver.process(topic2, null, "AnotherVal");
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), 
kv(3, null));
-
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", 
"2:X2+null", "3:X3+null");
-        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, 
"X2+null"), kv(3, "X3+null"));
-
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
-        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
-
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", 
"2:X2+YY2", "3:X3+YY3");
-        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
-
-        // push two items with null to the other stream as deletes. this 
should produce two item.
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
-        checkJoinedValues(getter, kv(0, "X0+null"), kv(1, "X1+null"), kv(2, 
"X2+YY2"), kv(3, "X3+YY3"));
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(builder.build(), props)) {
 
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
-        processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", 
"2:XX2+YY2", "3:XX3+YY3");
-        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, "XX1+null"), kv(2, 
"XX2+YY2"), kv(3, "XX3+YY3"));
-
-        // push middle two items to the primary stream with null. this should 
produce two items.
+            // push two items to the primary stream. the other table is empty
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
"X" + expectedKeys[i]));
+            }
+            // pass tuple with null key, it will be discarded in join process
+            driver.pipeInput(recordFactory.create(topic1, null, "SomeVal"));
+            assertOutputKeyValue(driver, 0, "X0+null");
+            assertOutputKeyValue(driver, 1, "X1+null");
+            assertNull(driver.readOutput(output));
+
+            // push two items to the other stream. this should produce two 
items.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
"Y" + expectedKeys[i]));
+            }
+            // pass tuple with null key, it will be discarded in join process
+            driver.pipeInput(recordFactory.create(topic2, null, "AnotherVal"));
+            assertOutputKeyValue(driver, 0, "X0+Y0");
+            assertOutputKeyValue(driver, 1, "X1+Y1");
+            assertNull(driver.readOutput(output));
+
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "X0+Y0");
+            assertOutputKeyValue(driver, 1, "X1+Y1");
+            assertOutputKeyValue(driver, 2, "X2+null");
+            assertOutputKeyValue(driver, 3, "X3+null");
+            assertNull(driver.readOutput(output));
+
+            // push all items to the other stream. this should produce four 
items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, 
"YY" + expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "X0+YY0");
+            assertOutputKeyValue(driver, 1, "X1+YY1");
+            assertOutputKeyValue(driver, 2, "X2+YY2");
+            assertOutputKeyValue(driver, 3, "X3+YY3");
+            assertNull(driver.readOutput(output));
+
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "X0+YY0");
+            assertOutputKeyValue(driver, 1, "X1+YY1");
+            assertOutputKeyValue(driver, 2, "X2+YY2");
+            assertOutputKeyValue(driver, 3, "X3+YY3");
+            assertNull(driver.readOutput(output));
+
+            // push two items with null to the other stream as deletes. this 
should produce two item.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
null));
+            }
+            assertOutputKeyValue(driver, 0, "X0+null");
+            assertOutputKeyValue(driver, 1, "X1+null");
+            assertNull(driver.readOutput(output));
 
-        for (int i = 1; i < 3; i++) {
-            driver.process(topic1, expectedKeys[i], null);
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            assertOutputKeyValue(driver, 0, "XX0+null");
+            assertOutputKeyValue(driver, 1, "XX1+null");
+            assertOutputKeyValue(driver, 2, "XX2+YY2");
+            assertOutputKeyValue(driver, 3, "XX3+YY3");
+            assertNull(driver.readOutput(output));
+
+            // push middle two items to the primary stream with null. this 
should produce two items.
+            for (int i = 1; i < 3; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
null));
+            }
+            assertOutputKeyValue(driver, 1, null);
+            assertOutputKeyValue(driver, 2, "null+YY2");
+            assertNull(driver.readOutput(output));
         }
-        driver.flushState();
-        processor.checkAndClearProcessResult("1:null", "2:null+YY2");
-        checkJoinedValues(getter, kv(0, "XX0+null"), kv(1, null), kv(2, 
"null+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
@@ -191,78 +174,63 @@ public class KTableKTableOuterJoinTest {
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         supplier = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) 
joined).name);
+        final Topology topology = builder.build().addProcessor("proc", 
supplier, ((KTableImpl<?, ?, ?>) joined).name);
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, props)) {
 
-        driver.setUp(builder, stateDir);
-
-        final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
-
-        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-        assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
-
-        // push two items to the primary stream. the other table is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
+            final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
 
-        // push two items to the other stream. this should produce two items.
+            assertTrue(((KTableImpl<?, ?, ?>) 
table1).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) 
table2).sendingOldValueEnabled());
+            assertFalse(((KTableImpl<?, ?, ?>) 
joined).sendingOldValueEnabled());
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
-
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", 
"2:(X2+null<-null)", "3:(X3+null<-null)");
-
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
-
-        // push all four items to the primary stream. this should produce four 
items.
+            // push two items to the primary stream. the other table is empty
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
"X" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
+            // push two items to the other stream. this should produce two 
items.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
"Y" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", 
"1:(X1+Y1<-null)");
 
-        // push two items with null to the other stream as deletes. this 
should produce two item.
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-null)", 
"1:(X1+Y1<-null)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
+            // push all items to the other stream. this should produce four 
items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, 
"YY" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-        // push all four items to the primary stream. this should produce four 
items.
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-null)", 
"1:(X1+YY1<-null)", "2:(X2+YY2<-null)", "3:(X3+YY3<-null)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(XX0+null<-null)", 
"1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
+            // push two items with null to the other stream as deletes. this 
should produce two item.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
null));
+            }
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
-        // push middle two items to the primary stream with null. this should 
produce two items.
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(XX0+null<-null)", 
"1:(XX1+null<-null)", "2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
-        for (int i = 1; i < 3; i++) {
-            driver.process(topic1, expectedKeys[i], null);
+            // push middle two items to the primary stream with null. this 
should produce two items.
+            for (int i = 1; i < 3; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
null));
+            }
+            proc.checkAndClearProcessResult("1:(null<-null)", 
"2:(null+YY2<-null)");
         }
-        driver.flushState();
-        proc.checkAndClearProcessResult("1:(null<-null)", 
"2:(null+YY2<-null)");
     }
 
     @Test
@@ -283,78 +251,64 @@ public class KTableKTableOuterJoinTest {
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
 
         supplier = new MockProcessorSupplier<>();
-        builder.build().addProcessor("proc", supplier, ((KTableImpl<?, ?, ?>) 
joined).name);
-
-        driver.setUp(builder, stateDir);
-
-        final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
+        final Topology topology = builder.build().addProcessor("proc", 
supplier, ((KTableImpl<?, ?, ?>) joined).name);
 
-        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
-        assertTrue(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
+        try (final TopologyTestDriver driver = new 
TopologyTestDriver(topology, props)) {
 
-        // push two items to the primary stream. the other table is empty
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
+            final MockProcessor<Integer, String> proc = 
supplier.theCapturedProcessor();
 
-        // push two items to the other stream. this should produce two items.
+            assertTrue(((KTableImpl<?, ?, ?>) 
table1).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) 
table2).sendingOldValueEnabled());
+            assertTrue(((KTableImpl<?, ?, ?>) 
joined).sendingOldValueEnabled());
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", 
"1:(X1+Y1<-X1+null)");
-
-        // push all four items to the primary stream. this should produce four 
items.
-
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", 
"1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
-
-        // push all items to the other stream. this should produce four items.
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic2, expectedKey, "YY" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", 
"1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
-
-        // push all four items to the primary stream. this should produce four 
items.
+            // push two items to the primary stream. the other table is empty
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
"X" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+null<-null)", 
"1:(X1+null<-null)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "X" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", 
"1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+            // push two items to the other stream. this should produce two 
items.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
"Y" + expectedKeys[i]));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+null)", 
"1:(X1+Y1<-X1+null)");
 
-        // push two items with null to the other stream as deletes. this 
should produce two item.
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", 
"1:(X1+Y1<-X1+Y1)", "2:(X2+null<-null)", "3:(X3+null<-null)");
 
-        for (int i = 0; i < 2; i++) {
-            driver.process(topic2, expectedKeys[i], null);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", 
"1:(X1+null<-X1+YY1)");
+            // push all items to the other stream. this should produce four 
items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKey, 
"YY" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", 
"1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-X2+null)", "3:(X3+YY3<-X3+null)");
 
-        // push all four items to the primary stream. this should produce four 
items.
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, "X" 
+ expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", 
"1:(X1+YY1<-X1+YY1)", "2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
 
-        for (final int expectedKey : expectedKeys) {
-            driver.process(topic1, expectedKey, "XX" + expectedKey);
-        }
-        driver.flushState();
-        proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", 
"1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
+            // push two items with null to the other stream as deletes. this 
should produce two item.
+            for (int i = 0; i < 2; i++) {
+                driver.pipeInput(recordFactory.create(topic2, expectedKeys[i], 
null));
+            }
+            proc.checkAndClearProcessResult("0:(X0+null<-X0+YY0)", 
"1:(X1+null<-X1+YY1)");
 
-        // push middle two items to the primary stream with null. this should 
produce two items.
+            // push all four items to the primary stream. this should produce 
four items.
+            for (final int expectedKey : expectedKeys) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKey, 
"XX" + expectedKey));
+            }
+            proc.checkAndClearProcessResult("0:(XX0+null<-X0+null)", 
"1:(XX1+null<-X1+null)", "2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
 
-        for (int i = 1; i < 3; i++) {
-            driver.process(topic1, expectedKeys[i], null);
+            // push middle two items to the primary stream with null. this 
should produce two items.
+            for (int i = 1; i < 3; i++) {
+                driver.pipeInput(recordFactory.create(topic1, expectedKeys[i], 
null));
+            }
+            proc.checkAndClearProcessResult("1:(null<-XX1+null)", 
"2:(null+YY2<-XX2+YY2)");
         }
-        driver.flushState();
-        proc.checkAndClearProcessResult("1:(null<-XX1+null)", 
"2:(null+YY2<-XX2+YY2)");
     }
 
     @Test
@@ -363,8 +317,8 @@ public class KTableKTableOuterJoinTest {
 
         @SuppressWarnings("unchecked")
         final Processor<String, Change<String>> join = new 
KTableKTableOuterJoin<>(
-            (KTableImpl<String, String, String>) builder.table("left", 
Consumed.with(stringSerde, stringSerde)),
-            (KTableImpl<String, String, String>) builder.table("right", 
Consumed.with(stringSerde, stringSerde)),
+            (KTableImpl<String, String, String>) builder.table("left", 
Consumed.with(Serdes.String(), Serdes.String())),
+            (KTableImpl<String, String, String>) builder.table("right", 
Consumed.with(Serdes.String(), Serdes.String())),
             null
         ).get();
 
@@ -379,19 +333,10 @@ public class KTableKTableOuterJoinTest {
         assertThat(appender.getMessages(), hasItem("Skipping record due to 
null key. change=[(new<-old)] topic=[left] partition=[-1] offset=[-2]"));
     }
 
-    private KeyValue<Integer, String> kv(final Integer key, final String 
value) {
-        return new KeyValue<>(key, value);
+    private void assertOutputKeyValue(final TopologyTestDriver driver,
+                                      final Integer expectedKey,
+                                      final String expectedValue) {
+        OutputVerifier.compareKeyValue(driver.readOutput(output, 
Serdes.Integer().deserializer(), Serdes.String().deserializer()), expectedKey, 
expectedValue);
     }
 
-    @SafeVarargs
-    private final void checkJoinedValues(final KTableValueGetter<Integer, 
String> getter, final KeyValue<Integer, String>... expected) {
-        for (final KeyValue<Integer, String> kv : expected) {
-            final String value = getter.get(kv.key);
-            if (kv.value == null) {
-                assertNull(value);
-            } else {
-                assertEquals(kv.value, value);
-            }
-        }
-    }
 }

Reply via email to