Repository: kafka
Updated Branches:
  refs/heads/trunk b687c0680 -> 45394d52c


KAFKA-5819; Add Joined class and relevant KStream join overloads

Add the `Joined` class and the overloads to `KStream` that use it.
Deprecate existing methods that have `Serde` params

Author: Damian Guy <damian....@gmail.com>

Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang 
<wangg...@gmail.com>

Closes #3776 from dguy/kip-182-stream-join


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45394d52
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45394d52
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45394d52

Branch: refs/heads/trunk
Commit: 45394d52c1ba566178c57897297a3ea31379f957
Parents: b687c06
Author: Damian Guy <damian....@gmail.com>
Authored: Wed Sep 6 10:55:43 2017 +0100
Committer: Damian Guy <damian....@gmail.com>
Committed: Wed Sep 6 10:55:43 2017 +0100

----------------------------------------------------------------------
 .../kafka/streams/kstream/JoinWindows.java      |   4 +-
 .../apache/kafka/streams/kstream/Joined.java    | 146 +++++++
 .../apache/kafka/streams/kstream/KStream.java   | 426 ++++++++++++++++++-
 .../kafka/streams/kstream/ValueJoiner.java      |  10 +-
 .../streams/kstream/internals/KStreamImpl.java  | 110 +++--
 .../integration/KStreamRepartitionJoinTest.java |  25 +-
 .../kstream/internals/KStreamImplTest.java      |  54 ++-
 .../internals/KStreamKStreamJoinTest.java       |  28 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |  13 +-
 9 files changed, 732 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
index 9d69738..ef9ed01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java
@@ -55,9 +55,9 @@ import java.util.Map;
  * @see UnlimitedWindows
  * @see SessionWindows
  * @see KStream#join(KStream, ValueJoiner, JoinWindows)
- * @see KStream#join(KStream, ValueJoiner, JoinWindows, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined)
  * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
- * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
  * @see TimestampExtractor

http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
new file mode 100644
index 0000000..8601e1c
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream;
+
+import org.apache.kafka.common.serialization.Serde;
+
+/**
+ * The {@code Joined} class represents optional params that can be passed to
+ * {@link KStream#join}, {@link KStream#leftJoin}, and  {@link 
KStream#outerJoin} operations.
+ */
+public class Joined<K, V, VO> {
+
+    private Serde<K> keySerde;
+    private Serde<V> valueSerde;
+    private Serde<VO> otherValueSerde;
+
+    private Joined(final Serde<K> keySerde,
+                   final Serde<V> valueSerde,
+                   final Serde<VO> otherValueSerde) {
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.otherValueSerde = otherValueSerde;
+    }
+
+    /**
+     * Create an instance of {@code Joined} with key, value, and otherValue 
{@link Serde} instances.
+     * {@code null} values are accepted and will be replaced by the default 
serdes as defined in config.
+     *
+     * @param keySerde        the key serde to use. If {@code null} the 
default key serde from config will be used
+     * @param valueSerde      the value serde to use. If {@code null} the 
default value serde from config will be used
+     * @param otherValueSerde the otherValue serde to use. If {@code null} the 
default value serde from config will be used
+     * @param <K>             key type
+     * @param <V>             value type
+     * @param <VO>            other value type
+     * @return new {@code Joined} instance with the provided serdes
+     */
+    public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde,
+                                                   final Serde<V> valueSerde,
+                                                   final Serde<VO> 
otherValueSerde) {
+        return new Joined<>(keySerde, valueSerde, otherValueSerde);
+    }
+
+    /**
+     * Create an instance of {@code Joined} with  a key {@link Serde}.
+     * {@code null} values are accepted and will be replaced by the default 
key serde as defined in config.
+     *
+     * @param keySerde the key serde to use. If {@code null} the default key 
serde from config will be used
+     * @param <K>      key type
+     * @param <V>      value type
+     * @param <VO>     other value type
+     * @return new {@code Joined} instance configured with the keySerde
+     */
+    public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> 
keySerde) {
+        return with(keySerde, null, null);
+    }
+
+    /**
+     * Create an instance of {@code Joined} with a value {@link Serde}.
+     * {@code null} values are accepted and will be replaced by the default 
value serde as defined in config.
+     *
+     * @param valueSerde the value serde to use. If {@code null} the default 
value serde from config will be used
+     * @param <K>        key type
+     * @param <V>        value type
+     * @param <VO>       other value type
+     * @return new {@code Joined} instance configured with the valueSerde
+     */
+    public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> 
valueSerde) {
+        return with(null, valueSerde, null);
+    }
+
+    /**
+     * Create an instance of {@code Joined} with an other value {@link Serde}.
+     * {@code null} values are accepted and will be replaced by the default 
value serde as defined in config.
+     *
+     * @param otherValueSerde the otherValue serde to use. If {@code null} the 
default value serde from config will be used
+     * @param <K>             key type
+     * @param <V>             value type
+     * @param <VO>            other value type
+     * @return new {@code Joined} instance configured with the otherValueSerde
+     */
+    public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> 
otherValueSerde) {
+        return with(null, null, otherValueSerde);
+    }
+
+    /**
+     * Set the key {@link Serde} to be used. Null values are accepted and will 
be replaced by the default
+     * key serde as defined in config
+     *
+     * @param keySerde the key serde to use. If null the default key serde 
from config will be used
+     * @return this
+     */
+    public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) {
+        this.keySerde = keySerde;
+        return this;
+    }
+
+    /**
+     * Set the value {@link Serde} to be used. Null values are accepted and 
will be replaced by the default
+     * value serde as defined in config
+     *
+     * @param valueSerde the value serde to use. If null the default value 
serde from config will be used
+     * @return this
+     */
+    public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) {
+        this.valueSerde = valueSerde;
+        return this;
+    }
+
+    /**
+     * Set the otherValue {@link Serde} to be used. Null values are accepted 
and will be replaced by the default
+     * value serde as defined in config
+     *
+     * @param otherValueSerde the otherValue serde to use. If null the default 
value serde from config will be used
+     * @return this
+     */
+    public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> 
otherValueSerde) {
+        this.otherValueSerde = otherValueSerde;
+        return this;
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    public Serde<VO> otherValueSerde() {
+        return otherValueSerde;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index f390167..8301cba 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -1237,6 +1237,84 @@ public interface KStream<K, V> {
                                  final JoinWindows windows);
 
     /**
+     * Join records of this stream with another {@code KStream}'s records 
using windowed inner equi join with default
+     * serializers and deserializers.
+     * The join is computed on the records' key with join attribute {@code 
thisKStream.key == otherKStream.key}.
+     * Furthermore, two records are only joined if their timestamps are close 
to each other as defined by the given
+     * {@link JoinWindows}, i.e., the window defines an additional join 
predicate on the record timestamps.
+     * <p>
+     * For each pair of records meeting both join predicates the provided 
{@link ValueJoiner} will be called to compute
+     * a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input 
records.
+     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * output record will be added to the resulting {@code KStream}.
+     * <p>
+     * Example (assuming all input records belong to the correct windows):
+     * <table border='1'>
+     * <tr>
+     * <th>this</th>
+     * <th>other</th>
+     * <th>result</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K2:B&gt;</td>
+     * <td>&lt;K2:b&gt;</td>
+     * <td>&lt;K2:ValueJoiner(B,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K3:c&gt;</td>
+     * <td></td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     * partitions.
+     * If this is not the case, you would need to call {@link 
#through(String)} (for one input stream) before doing the
+     * join, using a pre-created topic with the "correct" number of partitions.
+     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
+     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
+     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
+     * The repartitioning topic will be named 
"${applicationId}-XXX-repartition", where "applicationId" is
+     * user-specified in {@link  StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"XXX" is an internally generated name, and
+     * "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
+     * <p>
+     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
+     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
+     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
+     * correctly on its key.
+     * <p>
+     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * For failure and recovery each store will be backed by an internal 
changelog topic that will be created in Kafka.
+     * The changelog topic will be named 
"${applicationId}-storeName-changelog", where "applicationId" is user-specified
+     * in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"storeName" is an
+     * internally generated name, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
+     *
+     * @param otherStream the {@code KStream} to be joined with this stream
+     * @param joiner      a {@link ValueJoiner} that computes the join result 
for a pair of matching records
+     * @param windows     the specification of the {@link JoinWindows}
+     * @param joined      a {@link Joined} instance that defines the serdes to
+     *                    be used to serialize/deserialize inputs and outputs 
of the joined streams
+     * @param <VO>        the value type of the other stream
+     * @param <VR>        the value type of the result stream
+     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key 
and within the joining window intervals
+     * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
+     * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
+     */
+    <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
+                                 final ValueJoiner<? super V, ? super VO, ? 
extends VR> joiner,
+                                 final JoinWindows windows,
+                                 final Joined<K, V, VO> joined);
+    /**
      * Join records of this stream with another {@code KStream}'s records 
using windowed inner equi join.
      * The join is computed on the records' key with join attribute {@code 
thisKStream.key == otherKStream.key}.
      * Furthermore, two records are only joined if their timestamps are close 
to each other as defined by the given
@@ -1309,9 +1387,11 @@ public interface KStream<K, V> {
      * @param <VR>            the value type of the result stream
      * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key 
and within the joining window intervals
-     * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
-     * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
+     * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
+     * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
+     * @deprecated use {@link #join(KStream, ValueJoiner, JoinWindows, Joined)}
      */
+    @Deprecated
     <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
                                  final ValueJoiner<? super V, ? super VO, ? 
extends VR> joiner,
                                  final JoinWindows windows,
@@ -1400,8 +1480,91 @@ public interface KStream<K, V> {
                                      final JoinWindows windows);
 
     /**
+     * Join records of this stream with another {@code KStream}'s records 
using windowed left equi join with default
+     * serializers and deserializers.
+     * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) 
inner-join}, all records from this stream will
+     * produce at least one output record (cf. below).
+     * The join is computed on the records' key with join attribute {@code 
thisKStream.key == otherKStream.key}.
+     * Furthermore, two records are only joined if their timestamps are close 
to each other as defined by the given
+     * {@link JoinWindows}, i.e., the window defines an additional join 
predicate on the record timestamps.
+     * <p>
+     * For each pair of records meeting both join predicates the provided 
{@link ValueJoiner} will be called to compute
+     * a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input 
records.
+     * Furthermore, for each input record of this {@code KStream} that does 
not satisfy the join predicate the provided
+     * {@link ValueJoiner} will be called with a {@code null} value for the 
other stream.
+     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * output record will be added to the resulting {@code KStream}.
+     * <p>
+     * Example (assuming all input records belong to the correct windows):
+     * <table border='1'>
+     * <tr>
+     * <th>this</th>
+     * <th>other</th>
+     * <th>result</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K2:B&gt;</td>
+     * <td>&lt;K2:b&gt;</td>
+     * <td>&lt;K2:ValueJoiner(B,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K3:c&gt;</td>
+     * <td></td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     * partitions.
+     * If this is not the case, you would need to call {@link 
#through(String)} (for one input stream) before doing the
+     * join, using a pre-created topic with the "correct" number of partitions.
+     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
+     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
+     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
+     * The repartitioning topic will be named 
"${applicationId}-XXX-repartition", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"XXX" is an internally generated name, and
+     * "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
+     * <p>
+     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
+     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
+     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
+     * correctly on its key.
+     * <p>
+     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * For failure and recovery each store will be backed by an internal 
changelog topic that will be created in Kafka.
+     * The changelog topic will be named 
"${applicationId}-storeName-changelog", where "applicationId" is user-specified
+     * in {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "storeName" is an internally generated name, and "-changelog" is a 
fixed suffix.
+     * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
+     *
+     * @param otherStream the {@code KStream} to be joined with this stream
+     * @param joiner      a {@link ValueJoiner} that computes the join result 
for a pair of matching records
+     * @param windows     the specification of the {@link JoinWindows}
+     * @param joined      a {@link Joined} instance that defines the serdes to
+     *                    be used to serialize/deserialize inputs and outputs 
of the joined streams
+     * @param <VO>        the value type of the other stream
+     * @param <VR>        the value type of the result stream
+     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key 
plus one for each non-matching record of
+     * this {@code KStream} and within the joining window intervals
+     * @see #join(KStream, ValueJoiner, JoinWindows, Joined)
+     * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
+     */
+    <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
+                                     final ValueJoiner<? super V, ? super VO, 
? extends VR> joiner,
+                                     final JoinWindows windows,
+                                     final Joined<K, V, VO> joined);
+
+    /**
      * Join records of this stream with another {@code KStream}'s records 
using windowed left equi join.
-     * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows, Serde, 
Serde, Serde) inner-join}, all records from
+     * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows, Joined) 
inner-join}, all records from
      * this stream will produce at least one output record (cf. below).
      * The join is computed on the records' key with join attribute {@code 
thisKStream.key == otherKStream.key}.
      * Furthermore, two records are only joined if their timestamps are close 
to each other as defined by the given
@@ -1478,9 +1641,11 @@ public interface KStream<K, V> {
      * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key 
plus one for each non-matching record of
      * this {@code KStream} and within the joining window intervals
-     * @see #join(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
-     * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
+     * @see #join(KStream, ValueJoiner, JoinWindows, Joined)
+     * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
+     * @deprecated use {@link #leftJoin(KStream, ValueJoiner, JoinWindows, 
Joined}
      */
+    @Deprecated
     <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream,
                                      final ValueJoiner<? super V, ? super VO, 
? extends VR> joiner,
                                      final JoinWindows windows,
@@ -1570,9 +1735,91 @@ public interface KStream<K, V> {
                                       final JoinWindows windows);
 
     /**
+     * Join records of this stream with another {@code KStream}'s records 
using windowed outer equi join with default
+     * serializers and deserializers.
+     * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) 
inner-join} or
+     * {@link #leftJoin(KStream, ValueJoiner, JoinWindows) left-join}, all 
records from both streams will produce at
+     * least one output record (cf. below).
+     * The join is computed on the records' key with join attribute {@code 
thisKStream.key == otherKStream.key}.
+     * Furthermore, two records are only joined if their timestamps are close 
to each other as defined by the given
+     * {@link JoinWindows}, i.e., the window defines an additional join 
predicate on the record timestamps.
+     * <p>
+     * For each pair of records meeting both join predicates the provided 
{@link ValueJoiner} will be called to compute
+     * a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input 
records.
+     * Furthermore, for each input record of both {@code KStream}s that does 
not satisfy the join predicate the provided
+     * {@link ValueJoiner} will be called with a {@code null} value for the 
this/other stream, respectively.
+     * If an input record key or value is {@code null} the record will not be 
included in the join operation and thus no
+     * output record will be added to the resulting {@code KStream}.
+     * <p>
+     * Example (assuming all input records belong to the correct windows):
+     * <table border='1'>
+     * <tr>
+     * <th>this</th>
+     * <th>other</th>
+     * <th>result</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K2:B&gt;</td>
+     * <td>&lt;K2:b&gt;</td>
+     * <td>&lt;K2:ValueJoiner(null,b)&gt;<br />&lt;K2:ValueJoiner(B,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K3:c&gt;</td>
+     * <td>&lt;K3:ValueJoiner(null,c)&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     * partitions.
+     * If this is not the case, you would need to call {@link 
#through(String)} (for one input stream) before doing the
+     * join, using a pre-created topic with the "correct" number of partitions.
+     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner).
+     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
+     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
+     * The repartitioning topic will be named 
"${applicationId}-XXX-repartition", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"XXX" is an internally generated name, and
+     * "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
+     * <p>
+     * Repartitioning can happen for one or both of the joining {@code 
KStream}s.
+     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
+     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
+     * correctly on its key.
+     * <p>
+     * Both of the joining {@code KStream}s will be materialized in local 
state stores with auto-generated store names.
+     * For failure and recovery each store will be backed by an internal 
changelog topic that will be created in Kafka.
+     * The changelog topic will be named 
"${applicationId}-storeName-changelog", where "applicationId" is user-specified
+     * in {@link StreamsConfig} via parameter {@link 
StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG},
+     * "storeName" is an internally generated name, and "-changelog" is a 
fixed suffix.
+     * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
+     *
+     * @param otherStream the {@code KStream} to be joined with this stream
+     * @param joiner      a {@link ValueJoiner} that computes the join result 
for a pair of matching records
+     * @param windows     the specification of the {@link JoinWindows}
+     * @param <VO>        the value type of the other stream
+     * @param <VR>        the value type of the result stream
+     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key 
plus one for each non-matching record of
+     * both {@code KStream} and within the joining window intervals
+     * @see #join(KStream, ValueJoiner, JoinWindows, Joined)
+     * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
+     */
+    <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
+                                      final ValueJoiner<? super V, ? super VO, 
? extends VR> joiner,
+                                      final JoinWindows windows,
+                                      final Joined<K, V, VO> joined);
+
+    /**
      * Join records of this stream with another {@code KStream}'s records 
using windowed outer equi join.
-     * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows, Serde, 
Serde, Serde) inner-join} or
-     * {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, 
Serde) left-join}, all records from both
+     * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows, Joined) 
inner-join} or
+     * {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) left-join}, 
all records from both
      * streams will produce at least one output record (cf. below).
      * The join is computed on the records' key with join attribute {@code 
thisKStream.key == otherKStream.key}.
      * Furthermore, two records are only joined if their timestamps are close 
to each other as defined by the given
@@ -1648,9 +1895,11 @@ public interface KStream<K, V> {
      * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key 
plus one for each non-matching record of
      * both {@code KStream}s and within the joining window intervals
-     * @see #join(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
-     * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde)
+     * @see #join(KStream, ValueJoiner, JoinWindows, Joined)
+     * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
+     * @deprecated use {@link #outerJoin(KStream, ValueJoiner, JoinWindows, 
Joined)}
      */
+    @Deprecated
     <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream,
                                       final ValueJoiner<? super V, ? super VO, 
? extends VR> joiner,
                                       final JoinWindows windows,
@@ -1733,6 +1982,83 @@ public interface KStream<K, V> {
                                  final ValueJoiner<? super V, ? super VT, ? 
extends VR> joiner);
 
     /**
+     * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join with default
+     * serializers and deserializers.
+     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
+     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
+     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
+     * {@link KTable} state.
+     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
+     * will not produce any result records.
+     * <p>
+     * For each {@code KStream} record that finds a corresponding record in 
{@link KTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
+     * The key of the result record is the same as for both joining input 
records.
+     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
+     * operation and thus no output record will be added to the resulting 
{@code KStream}.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>KStream</th>
+     * <th>KTable</th>
+     * <th>state</th>
+     * <th>result</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td></td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td></td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:C&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     * partitions.
+     * If this is not the case, you would need to call {@link 
#through(String)} for this {@code KStream} before doing
+     * the join, using a pre-created topic with the same number of partitions 
as the given {@link KTable}.
+     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
+     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
+     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
+     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
+     * The repartitioning topic will be named 
"${applicationId}-XXX-repartition", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"XXX" is an internally generated name, and
+     * "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
+     * <p>
+     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
+     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
+     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
+     * correctly on its key.
+     *
+     * @param table  the {@link KTable} to be joined with this stream
+     * @param joiner a {@link ValueJoiner} that computes the join result for a 
pair of matching records
+     * @param joined      a {@link Joined} instance that defines the serdes to
+     *                    be used to serialize/deserialize inputs of the 
joined streams
+     * @param <VT>   the value type of the table
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key
+     * @see #leftJoin(KTable, ValueJoiner, Joined)
+     * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
+     */
+    <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
+                                 final ValueJoiner<? super V, ? super VT, ? 
extends VR> joiner,
+                                 final Joined<K, V, VT> joined);
+
+    /**
      * Join records of this stream with {@link KTable}'s records using 
non-windowed inner equi join.
      * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
      * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
@@ -1803,9 +2129,11 @@ public interface KStream<K, V> {
      * @param <VR>     the value type of the result stream
      * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
      * {@link ValueJoiner}, one for each matched record-pair with the same key
-     * @see #leftJoin(KTable, ValueJoiner, Serde, Serde)
+     * @see #leftJoin(KTable, ValueJoiner, Joined)
      * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner)
+     * @deprecated use {@link #join(KTable, ValueJoiner, Joined)}
      */
+    @Deprecated
     <VT, VR> KStream<K, VR> join(final KTable<K, VT> table,
                                  final ValueJoiner<? super V, ? super VT, ? 
extends VR> joiner,
                                  final Serde<K> keySerde,
@@ -1889,6 +2217,84 @@ public interface KStream<K, V> {
                                      final ValueJoiner<? super V, ? super VT, 
? extends VR> joiner);
 
     /**
+     * Join records of this stream with {@link KTable}'s records using 
non-windowed left equi join with default
+     * serializers and deserializers.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all 
records from this stream will produce an
+     * output record (cf. below).
+     * The join is a primary key table lookup join with join attribute {@code 
stream.key == table.key}.
+     * "Table lookup join" means, that results are only computed if {@code 
KStream} records are processed.
+     * This is done by performing a lookup for matching records in the 
<em>current</em> (i.e., processing time) internal
+     * {@link KTable} state.
+     * In contrast, processing {@link KTable} input records will only update 
the internal {@link KTable} state and
+     * will not produce any result records.
+     * <p>
+     * For each {@code KStream} record whether or not it finds a corresponding 
record in {@link KTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary 
type) for the result record.
+     * If no {@link KTable} record was found during lookup, a {@code null} 
value will be provided to {@link ValueJoiner}.
+     * The key of the result record is the same as for both joining input 
records.
+     * If an {@code KStream} input record key or value is {@code null} the 
record will not be included in the join
+     * operation and thus no output record will be added to the resulting 
{@code KStream}.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>KStream</th>
+     * <th>KTable</th>
+     * <th>state</th>
+     * <th>result</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td></td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:C&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams (or to be more precise, their underlying source 
topics) need to have the same number of
+     * partitions.
+     * If this is not the case, you would need to call {@link 
#through(String)} for this {@code KStream} before doing
+     * the join, using a pre-created topic with the same number of partitions 
as the given {@link KTable}.
+     * Furthermore, both input streams need to be co-partitioned on the join 
key (i.e., use the same partitioner);
+     * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}.
+     * If this requirement is not met, Kafka Streams will automatically 
repartition the data, i.e., it will create an
+     * internal repartitioning topic in Kafka and write and re-read the data 
via this topic before the actual join.
+     * The repartitioning topic will be named 
"${applicationId}-XXX-repartition", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, 
"XXX" is an internally generated name, and
+     * "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link 
KafkaStreams#toString()}.
+     * <p>
+     * Repartitioning can happen only for this {@code KStream} but not for the 
provided {@link KTable}.
+     * For this case, all data of the stream will be redistributed through the 
repartitioning topic by writing all
+     * records to it, and rereading all records from it, such that the join 
input {@code KStream} is partitioned
+     * correctly on its key.
+     *
+     * @param table  the {@link KTable} to be joined with this stream
+     * @param joiner a {@link ValueJoiner} that computes the join result for a 
pair of matching records
+     * @param <VT>   the value type of the table
+     * @param <VR>   the value type of the result stream
+     * @return a {@code KStream} that contains join-records for each key and 
values computed by the given
+     * {@link ValueJoiner}, one output for each input {@code KStream} record
+     * @see #join(KTable, ValueJoiner, Joined)
+     * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner)
+     */
+    <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table,
+                                     final ValueJoiner<? super V, ? super VT, 
? extends VR> joiner,
+                                     final Joined<K, V, VT> joined);
+
+    /**
      * Join records of this stream with {@link KTable}'s records using 
non-windowed left equi join.
      * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all 
records from this stream will produce an
      * output record (cf. below).

http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java 
b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
index 1ed6003..0f0a747 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java
@@ -27,15 +27,15 @@ package org.apache.kafka.streams.kstream;
  * @param <V2> second value type
  * @param <VR> joined value type
  * @see KStream#join(KStream, ValueJoiner, JoinWindows)
- * @see KStream#join(KStream, ValueJoiner, JoinWindows, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined)
  * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows)
- * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined)
  * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows)
- * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined)
  * @see KStream#join(KTable, ValueJoiner)
- * @see KStream#join(KTable, ValueJoiner, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#join(KTable, ValueJoiner, Joined)
  * @see KStream#leftJoin(KTable, ValueJoiner)
- * @see KStream#leftJoin(KTable, ValueJoiner, 
org.apache.kafka.common.serialization.Serde, 
org.apache.kafka.common.serialization.Serde)
+ * @see KStream#leftJoin(KTable, ValueJoiner, Joined)
  * @see KTable#join(KTable, ValueJoiner)
  * @see KTable#leftJoin(KTable, ValueJoiner)
  * @see KTable#outerJoin(KTable, ValueJoiner)

http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index f46f222..8534da8 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -504,7 +505,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
                                       final Serde<K> keySerde,
                                       final Serde<V> thisValueSerde,
                                       final Serde<V1> otherValueSerde) {
-        return doJoin(other, joiner, windows, keySerde, thisValueSerde, 
otherValueSerde,
+        return doJoin(other, joiner, windows, Joined.with(keySerde, 
thisValueSerde, otherValueSerde),
             new KStreamImplJoin(false, false));
     }
 
@@ -512,7 +513,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
     public <V1, R> KStream<K, R> join(final KStream<K, V1> other,
                                       final ValueJoiner<? super V, ? super V1, 
? extends R> joiner,
                                       final JoinWindows windows) {
-        return join(other, joiner, windows, null, null, null);
+        return join(other, joiner, windows, Joined.<K, V, V1>with(null, null, 
null));
+    }
+
+    @Override
+    public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream,
+                                        final ValueJoiner<? super V, ? super 
VO, ? extends VR> joiner,
+                                        final JoinWindows windows,
+                                        final Joined<K, V, VO> joined) {
+        return doJoin(otherStream, joiner, windows, joined,
+                             new KStreamImplJoin(false, false));
     }
 
     @Override
@@ -522,36 +532,43 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
                                            final Serde<K> keySerde,
                                            final Serde<V> thisValueSerde,
                                            final Serde<V1> otherValueSerde) {
-        return doJoin(other, joiner, windows, keySerde, thisValueSerde, 
otherValueSerde, new KStreamImplJoin(true, true));
+        return outerJoin(other, joiner, windows, Joined.with(keySerde, 
thisValueSerde, otherValueSerde));
     }
 
     @Override
     public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other,
                                            final ValueJoiner<? super V, ? 
super V1, ? extends R> joiner,
                                            final JoinWindows windows) {
-        return outerJoin(other, joiner, windows, null, null, null);
+        return outerJoin(other, joiner, windows, Joined.<K, V, V1>with(null, 
null, null));
+    }
+
+    @Override
+    public <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other,
+                                             final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
+                                             final JoinWindows windows,
+                                             final Joined<K, V, VO> joined) {
+        return doJoin(other, joiner, windows, joined, new 
KStreamImplJoin(true, true));
     }
 
     private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
                                          final ValueJoiner<? super V, ? super 
V1, ? extends R> joiner,
                                          final JoinWindows windows,
-                                         final Serde<K> keySerde,
-                                         final Serde<V> thisValueSerde,
-                                         final Serde<V1> otherValueSerde,
+                                         final Joined<K, V, V1> joined,
                                          final KStreamImplJoin join) {
         Objects.requireNonNull(other, "other KStream can't be null");
         Objects.requireNonNull(joiner, "joiner can't be null");
         Objects.requireNonNull(windows, "windows can't be null");
+        Objects.requireNonNull(joined, "joined can't be null");
 
         KStreamImpl<K, V> joinThis = this;
         KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other;
 
         if (joinThis.repartitionRequired) {
-            joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde);
+            joinThis = joinThis.repartitionForJoin(joined.keySerde(), 
joined.valueSerde());
         }
 
         if (joinOther.repartitionRequired) {
-            joinOther = joinOther.repartitionForJoin(keySerde, 
otherValueSerde);
+            joinOther = joinOther.repartitionForJoin(joined.keySerde(), 
joined.otherValueSerde());
         }
 
         joinThis.ensureJoinableWith(joinOther);
@@ -560,9 +577,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
             joinOther,
             joiner,
             windows,
-            keySerde,
-            thisValueSerde,
-            otherValueSerde);
+            joined);
     }
 
     /**
@@ -620,9 +635,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
         return doJoin(other,
             joiner,
             windows,
-            keySerde,
-            thisValSerde,
-            otherValueSerde,
+            Joined.with(keySerde, thisValSerde, otherValueSerde),
             new KStreamImplJoin(true, false));
     }
 
@@ -630,22 +643,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
     public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other,
                                           final ValueJoiner<? super V, ? super 
V1, ? extends R> joiner,
                                           final JoinWindows windows) {
-        return leftJoin(other, joiner, windows, null, null, null);
+        return leftJoin(other, joiner, windows, Joined.<K, V, V1>with(null, 
null, null));
+    }
+
+    @Override
+    public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other,
+                                            final ValueJoiner<? super V, ? 
super VO, ? extends VR> joiner,
+                                            final JoinWindows windows,
+                                            final Joined<K, V, VO> joined) {
+        Objects.requireNonNull(joined, "joined can't be null");
+        return doJoin(other,
+                      joiner,
+                      windows,
+                      joined,
+                      new KStreamImplJoin(true, false));
     }
 
     @Override
     public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
                                       final ValueJoiner<? super V, ? super V1, 
? extends R> joiner) {
-        return join(other, joiner, null, null);
+        return join(other, joiner, Joined.<K, V, V1>with(null, null, null));
     }
 
     @Override
-    public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
-                                      final ValueJoiner<? super V, ? super V1, 
? extends R> joiner,
-                                      final Serde<K> keySerde,
-                                      final Serde<V> valueSerde) {
+    public <VT, VR> KStream<K, VR> join(final KTable<K, VT> other,
+                                        final ValueJoiner<? super V, ? super 
VT, ? extends VR> joiner,
+                                        final Joined<K, V, VT> joined) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+        Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(keySerde, valueSerde);
+            final KStreamImpl<K, V> thisStreamRepartitioned = 
repartitionForJoin(joined.keySerde(), joined.valueSerde());
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, 
false);
         } else {
             return doStreamTableJoin(other, joiner, false);
@@ -653,6 +681,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
     }
 
     @Override
+    public <V1, R> KStream<K, R> join(final KTable<K, V1> other,
+                                      final ValueJoiner<? super V, ? super V1, 
? extends R> joiner,
+                                      final Serde<K> keySerde,
+                                      final Serde<V> valueSerde) {
+        return join(other, joiner, Joined.<K, V, V1>with(keySerde, valueSerde, 
null));
+    }
+
+    @Override
     public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> 
globalTable,
                                               final KeyValueMapper<? super K, 
? super V, ? extends K1> keyMapper,
                                               final ValueJoiner<? super V, ? 
super V1, ? extends R> joiner) {
@@ -698,21 +734,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
 
     @Override
     public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final 
ValueJoiner<? super V, ? super V1, ? extends R> joiner) {
-        return leftJoin(other, joiner, null, null);
+        return leftJoin(other, joiner, Joined.<K, V, V1>with(null, null, 
null));
     }
 
-    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
-                                          final ValueJoiner<? super V, ? super 
V1, ? extends R> joiner,
-                                          final Serde<K> keySerde,
-                                          final Serde<V> valueSerde) {
+    @Override
+    public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other,
+                                            final ValueJoiner<? super V, ? 
super VT, ? extends VR> joiner,
+                                            final Joined<K, V, VT> joined) {
+        Objects.requireNonNull(other, "other can't be null");
+        Objects.requireNonNull(joiner, "joiner can't be null");
+        Objects.requireNonNull(joined, "joined can't be null");
         if (repartitionRequired) {
-            final KStreamImpl<K, V> thisStreamRepartitioned = 
this.repartitionForJoin(keySerde, valueSerde);
+            final KStreamImpl<K, V> thisStreamRepartitioned = 
this.repartitionForJoin(joined.keySerde(), joined.valueSerde());
             return thisStreamRepartitioned.doStreamTableJoin(other, joiner, 
true);
         } else {
             return doStreamTableJoin(other, joiner, true);
         }
     }
 
+    public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other,
+                                          final ValueJoiner<? super V, ? super 
V1, ? extends R> joiner,
+                                          final Serde<K> keySerde,
+                                          final Serde<V> valueSerde) {
+        return leftJoin(other, joiner, Joined.<K, V, V1>with(keySerde, 
valueSerde, null));
+    }
+
     @Override
     public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, 
? super V, K1> selector) {
         return groupBy(selector, Serialized.<K1, V>with(null, null));
@@ -789,9 +835,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
                                                    final KStream<K1, V2> other,
                                                    final ValueJoiner<? super 
V1, ? super V2, ? extends R> joiner,
                                                    final JoinWindows windows,
-                                                   final Serde<K1> keySerde,
-                                                   final Serde<V1> 
lhsValueSerde,
-                                                   final Serde<V2> 
otherValueSerde) {
+                                                   final Joined joined) {
             String thisWindowStreamName = builder.newName(WINDOWED_NAME);
             String otherWindowStreamName = builder.newName(WINDOWED_NAME);
             String joinThisName = rightOuter ? builder.newName(OUTERTHIS_NAME) 
: builder.newName(JOINTHIS_NAME);
@@ -799,10 +843,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> 
implements KStream<K, V
             String joinMergeName = builder.newName(MERGE_NAME);
 
             StateStoreSupplier thisWindow =
-                createWindowedStateStore(windows, keySerde, lhsValueSerde, 
joinThisName + "-store");
+                createWindowedStateStore(windows, joined.keySerde(), 
joined.valueSerde(), joinThisName + "-store");
 
             StateStoreSupplier otherWindow =
-                createWindowedStateStore(windows, keySerde, otherValueSerde, 
joinOtherName + "-store");
+                createWindowedStateStore(windows, joined.keySerde(), 
joined.otherValueSerde(), joinOtherName + "-store");
 
 
             KStreamJoinWindow<K1, V1> thisWindowedStream = new 
KStreamJoinWindow<>(thisWindow.name(),

http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index b653647..4a356c7 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -212,9 +213,7 @@ public class KStreamRepartitionJoinTest {
             .join(streamOne.map(keyMapper),
                 TOSTRING_JOINER,
                 getJoinWindow(),
-                Serdes.Integer(),
-                Serdes.String(),
-                Serdes.Integer())
+                Joined.with(Serdes.Integer(), Serdes.String(), 
Serdes.Integer()))
             .to(Serdes.Integer(), Serdes.String(), output);
 
         return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", 
"D:4", "E:5"), output);
@@ -231,9 +230,7 @@ public class KStreamRepartitionJoinTest {
         map1.leftJoin(map2,
             TOSTRING_JOINER,
             getJoinWindow(),
-            Serdes.Integer(),
-            Serdes.Integer(),
-            Serdes.String())
+            Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String()))
             .filterNot(new Predicate<Integer, String>() {
                 @Override
                 public boolean test(Integer key, String value) {
@@ -257,9 +254,7 @@ public class KStreamRepartitionJoinTest {
         final KStream<Integer, String> join = map1.join(map2,
             TOSTRING_JOINER,
             getJoinWindow(),
-            Serdes.Integer(),
-            Serdes.Integer(),
-            Serdes.String());
+            Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String()));
 
         final String topic = "map-join-join-" + testNo;
         CLUSTER.createTopic(topic);
@@ -267,9 +262,7 @@ public class KStreamRepartitionJoinTest {
             .join(streamFour.map(kvMapper),
                 TOSTRING_JOINER,
                 getJoinWindow(),
-                Serdes.Integer(),
-                Serdes.String(),
-                Serdes.String())
+                Joined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String()))
             .to(Serdes.Integer(), Serdes.String(), topic);
 
 
@@ -387,11 +380,9 @@ public class KStreamRepartitionJoinTest {
                         final String outputTopic) throws InterruptedException {
         CLUSTER.createTopic(outputTopic);
         lhs.join(rhs,
-            TOSTRING_JOINER,
-            getJoinWindow(),
-            Serdes.Integer(),
-            Serdes.Integer(),
-            Serdes.String())
+                 TOSTRING_JOINER,
+                 getJoinWindow(),
+                 Joined.with(Serdes.Integer(), Serdes.Integer(), 
Serdes.String()))
             .to(Serdes.Integer(), Serdes.String(), outputTopic);
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 3e50abb..1fed374 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 
 public class KStreamImplTest {
@@ -127,19 +129,20 @@ public class KStreamImplTest {
         );
 
         final int anyWindowSize = 1;
+        final Joined<String, Integer, Integer> joined = 
Joined.with(stringSerde, intSerde, intSerde);
         KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new 
ValueJoiner<Integer, Integer, Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of(anyWindowSize), stringSerde, intSerde, intSerde);
+        }, JoinWindows.of(anyWindowSize), joined);
 
-        KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new 
ValueJoiner<Integer, Integer, Integer>() {
+        streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, 
Integer>() {
             @Override
             public Integer apply(Integer value1, Integer value2) {
                 return value1 + value2;
             }
-        }, JoinWindows.of(anyWindowSize), stringSerde, intSerde, intSerde);
+        }, JoinWindows.of(anyWindowSize), joined);
 
         stream4.to("topic-5");
 
@@ -190,11 +193,11 @@ public class KStreamImplTest {
                             }
                         });
         stream.join(kStream,
-                valueJoiner,
-                JoinWindows.of(windowSize).until(3 * windowSize),
-                Serdes.String(),
-                Serdes.String(),
-                Serdes.String())
+                    valueJoiner,
+                    JoinWindows.of(windowSize).until(3 * windowSize),
+                    Joined.with(Serdes.String(),
+                                Serdes.String(),
+                                Serdes.String()))
                 .to(Serdes.String(), Serdes.String(), "output-topic");
 
         ProcessorTopology processorTopology = 
builder.setApplicationId("X").build(null);
@@ -373,4 +376,39 @@ public class KStreamImplTest {
                         null);
     }
 
+    @Test
+    public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
+        final KTable<String, String> table = builder.table(Serdes.String(), 
Serdes.String(), "blah");
+        try {
+            testStream.leftJoin(table,
+                                MockValueJoiner.TOSTRING_JOINER,
+                                null);
+            fail("Should have thrown NullPointerException");
+        } catch (final NullPointerException e) {
+            // ok
+        }
+    }
+
+    @Test
+    public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
+        final KTable<String, String> table = builder.table(Serdes.String(), 
Serdes.String(), "blah");
+        try {
+            testStream.join(table,
+                            MockValueJoiner.TOSTRING_JOINER,
+                            null);
+            fail("Should have thrown NullPointerException");
+        } catch (final NullPointerException e) {
+            // ok
+        }
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnJoinWithStreamWhenJoinedIsNull() {
+        testStream.join(testStream, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(10), null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() {
+        testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(10), null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index a733fae..ab7ca53 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -72,7 +73,10 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2,
+                              MockValueJoiner.TOSTRING_JOINER,
+                              JoinWindows.of(100),
+                              Joined.with(intSerde, stringSerde, stringSerde));
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -170,7 +174,10 @@ public class KStreamKStreamJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.outerJoin(stream2,
+                                   MockValueJoiner.TOSTRING_JOINER,
+                                   JoinWindows.of(100),
+                                   Joined.with(intSerde, stringSerde, 
stringSerde));
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -271,7 +278,10 @@ public class KStreamKStreamJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2,
+                              MockValueJoiner.TOSTRING_JOINER,
+                              JoinWindows.of(100),
+                              Joined.with(intSerde, stringSerde, stringSerde));
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -498,7 +508,12 @@ public class KStreamKStreamJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(0).after(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2,
+                              MockValueJoiner.TOSTRING_JOINER,
+                              JoinWindows.of(0).after(100),
+                              Joined.with(intSerde,
+                                          stringSerde,
+                                          stringSerde));
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -607,7 +622,10 @@ public class KStreamKStreamJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(0).before(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.join(stream2,
+                              MockValueJoiner.TOSTRING_JOINER,
+                              JoinWindows.of(0).before(100),
+                              Joined.with(intSerde, stringSerde, stringSerde));
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index e0ba9fe..7d8297c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -73,7 +74,10 @@ public class KStreamKStreamLeftJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.leftJoin(stream2,
+                                  MockValueJoiner.TOSTRING_JOINER,
+                                  JoinWindows.of(100),
+                                  Joined.with(intSerde, stringSerde, 
stringSerde));
         joined.process(processor);
 
         final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -149,9 +153,7 @@ public class KStreamKStreamLeftJoinTest {
     @Test
     public void testWindowing() throws Exception {
         final StreamsBuilder builder = new StreamsBuilder();
-
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
         long time = 0L;
 
         final KStream<Integer, String> stream1;
@@ -163,7 +165,10 @@ public class KStreamKStreamLeftJoinTest {
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
 
-        joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, 
JoinWindows.of(100), intSerde, stringSerde, stringSerde);
+        joined = stream1.leftJoin(stream2,
+                                  MockValueJoiner.TOSTRING_JOINER,
+                                  JoinWindows.of(100),
+                                  Joined.with(intSerde, stringSerde, 
stringSerde));
         joined.process(processor);
 
         final Collection<Set<String>> copartitionGroups = 
StreamsBuilderTest.getCopartitionedGroups(builder);

Reply via email to