Repository: kafka Updated Branches: refs/heads/trunk b687c0680 -> 45394d52c
KAFKA-5819; Add Joined class and relevant KStream join overloads Add the `Joined` class and the overloads to `KStream` that use it. Deprecate existing methods that have `Serde` params Author: Damian Guy <damian....@gmail.com> Reviewers: Matthias J. Sax <matth...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #3776 from dguy/kip-182-stream-join Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/45394d52 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/45394d52 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/45394d52 Branch: refs/heads/trunk Commit: 45394d52c1ba566178c57897297a3ea31379f957 Parents: b687c06 Author: Damian Guy <damian....@gmail.com> Authored: Wed Sep 6 10:55:43 2017 +0100 Committer: Damian Guy <damian....@gmail.com> Committed: Wed Sep 6 10:55:43 2017 +0100 ---------------------------------------------------------------------- .../kafka/streams/kstream/JoinWindows.java | 4 +- .../apache/kafka/streams/kstream/Joined.java | 146 +++++++ .../apache/kafka/streams/kstream/KStream.java | 426 ++++++++++++++++++- .../kafka/streams/kstream/ValueJoiner.java | 10 +- .../streams/kstream/internals/KStreamImpl.java | 110 +++-- .../integration/KStreamRepartitionJoinTest.java | 25 +- .../kstream/internals/KStreamImplTest.java | 54 ++- .../internals/KStreamKStreamJoinTest.java | 28 +- .../internals/KStreamKStreamLeftJoinTest.java | 13 +- 9 files changed, 732 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java index 9d69738..ef9ed01 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/JoinWindows.java @@ -55,9 +55,9 @@ import java.util.Map; * @see UnlimitedWindows * @see SessionWindows * @see KStream#join(KStream, ValueJoiner, JoinWindows) - * @see KStream#join(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows) - * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) * @see TimestampExtractor http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java new file mode 100644 index 0000000..8601e1c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.common.serialization.Serde; + +/** + * The {@code Joined} class represents optional params that can be passed to + * {@link KStream#join}, {@link KStream#leftJoin}, and {@link KStream#outerJoin} operations. + */ +public class Joined<K, V, VO> { + + private Serde<K> keySerde; + private Serde<V> valueSerde; + private Serde<VO> otherValueSerde; + + private Joined(final Serde<K> keySerde, + final Serde<V> valueSerde, + final Serde<VO> otherValueSerde) { + this.keySerde = keySerde; + this.valueSerde = valueSerde; + this.otherValueSerde = otherValueSerde; + } + + /** + * Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances. + * {@code null} values are accepted and will be replaced by the default serdes as defined in config. + * + * @param keySerde the key serde to use. If {@code null} the default key serde from config will be used + * @param valueSerde the value serde to use. If {@code null} the default value serde from config will be used + * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde from config will be used + * @param <K> key type + * @param <V> value type + * @param <VO> other value type + * @return new {@code Joined} instance with the provided serdes + */ + public static <K, V, VO> Joined<K, V, VO> with(final Serde<K> keySerde, + final Serde<V> valueSerde, + final Serde<VO> otherValueSerde) { + return new Joined<>(keySerde, valueSerde, otherValueSerde); + } + + /** + * Create an instance of {@code Joined} with a key {@link Serde}. + * {@code null} values are accepted and will be replaced by the default key serde as defined in config. + * + * @param keySerde the key serde to use. If {@code null} the default key serde from config will be used + * @param <K> key type + * @param <V> value type + * @param <VO> other value type + * @return new {@code Joined} instance configured with the keySerde + */ + public static <K, V, VO> Joined<K, V, VO> keySerde(final Serde<K> keySerde) { + return with(keySerde, null, null); + } + + /** + * Create an instance of {@code Joined} with a value {@link Serde}. + * {@code null} values are accepted and will be replaced by the default value serde as defined in config. + * + * @param valueSerde the value serde to use. If {@code null} the default value serde from config will be used + * @param <K> key type + * @param <V> value type + * @param <VO> other value type + * @return new {@code Joined} instance configured with the valueSerde + */ + public static <K, V, VO> Joined<K, V, VO> valueSerde(final Serde<V> valueSerde) { + return with(null, valueSerde, null); + } + + /** + * Create an instance of {@code Joined} with an other value {@link Serde}. + * {@code null} values are accepted and will be replaced by the default value serde as defined in config. + * + * @param otherValueSerde the otherValue serde to use. If {@code null} the default value serde from config will be used + * @param <K> key type + * @param <V> value type + * @param <VO> other value type + * @return new {@code Joined} instance configured with the otherValueSerde + */ + public static <K, V, VO> Joined<K, V, VO> otherValueSerde(final Serde<VO> otherValueSerde) { + return with(null, null, otherValueSerde); + } + + /** + * Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default + * key serde as defined in config + * + * @param keySerde the key serde to use. If null the default key serde from config will be used + * @return this + */ + public Joined<K, V, VO> withKeySerde(final Serde<K> keySerde) { + this.keySerde = keySerde; + return this; + } + + /** + * Set the value {@link Serde} to be used. Null values are accepted and will be replaced by the default + * value serde as defined in config + * + * @param valueSerde the value serde to use. If null the default value serde from config will be used + * @return this + */ + public Joined<K, V, VO> withValueSerde(final Serde<V> valueSerde) { + this.valueSerde = valueSerde; + return this; + } + + /** + * Set the otherValue {@link Serde} to be used. Null values are accepted and will be replaced by the default + * value serde as defined in config + * + * @param otherValueSerde the otherValue serde to use. If null the default value serde from config will be used + * @return this + */ + public Joined<K, V, VO> withOtherValueSerde(final Serde<VO> otherValueSerde) { + this.otherValueSerde = otherValueSerde; + return this; + } + + public Serde<K> keySerde() { + return keySerde; + } + + public Serde<V> valueSerde() { + return valueSerde; + } + + public Serde<VO> otherValueSerde() { + return otherValueSerde; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index f390167..8301cba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -1237,6 +1237,84 @@ public interface KStream<K, V> { final JoinWindows windows); /** + * Join records of this stream with another {@code KStream}'s records using windowed inner equi join with default + * serializers and deserializers. + * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. + * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given + * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. + * <p> + * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute + * a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * output record will be added to the resulting {@code KStream}. + * <p> + * Example (assuming all input records belong to the correct windows): + * <table border='1'> + * <tr> + * <th>this</th> + * <th>other</th> + * <th>result</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td></td> + * <td></td> + * </tr> + * <tr> + * <td><K2:B></td> + * <td><K2:b></td> + * <td><K2:ValueJoiner(B,b)></td> + * </tr> + * <tr> + * <td></td> + * <td><K3:c></td> + * <td></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the + * join, using a pre-created topic with the "correct" number of partitions. + * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). + * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an + * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. + * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and + * "-repartition" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * <p> + * Repartitioning can happen for one or both of the joining {@code KStream}s. + * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all + * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned + * correctly on its key. + * <p> + * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified + * in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is an + * internally generated name, and "-changelog" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param otherStream the {@code KStream} to be joined with this stream + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param windows the specification of the {@link JoinWindows} + * @param joined a {@link Joined} instance that defines the serdes to + * be used to serialize/deserialize inputs and outputs of the joined streams + * @param <VO> the value type of the other stream + * @param <VR> the value type of the result stream + * @return a {@code KStream} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals + * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) + */ + <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows, + final Joined<K, V, VO> joined); + /** * Join records of this stream with another {@code KStream}'s records using windowed inner equi join. * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given @@ -1309,9 +1387,11 @@ public interface KStream<K, V> { * @param <VR> the value type of the result stream * @return a {@code KStream} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key and within the joining window intervals - * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) - * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) + * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @deprecated use {@link #join(KStream, ValueJoiner, JoinWindows, Joined)} */ + @Deprecated <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, @@ -1400,8 +1480,91 @@ public interface KStream<K, V> { final JoinWindows windows); /** + * Join records of this stream with another {@code KStream}'s records using windowed left equi join with default + * serializers and deserializers. + * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join}, all records from this stream will + * produce at least one output record (cf. below). + * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. + * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given + * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. + * <p> + * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute + * a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * Furthermore, for each input record of this {@code KStream} that does not satisfy the join predicate the provided + * {@link ValueJoiner} will be called with a {@code null} value for the other stream. + * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * output record will be added to the resulting {@code KStream}. + * <p> + * Example (assuming all input records belong to the correct windows): + * <table border='1'> + * <tr> + * <th>this</th> + * <th>other</th> + * <th>result</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td><K2:B></td> + * <td><K2:b></td> + * <td><K2:ValueJoiner(B,b)></td> + * </tr> + * <tr> + * <td></td> + * <td><K3:c></td> + * <td></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the + * join, using a pre-created topic with the "correct" number of partitions. + * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). + * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an + * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. + * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and + * "-repartition" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * <p> + * Repartitioning can happen for one or both of the joining {@code KStream}s. + * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all + * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned + * correctly on its key. + * <p> + * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified + * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param otherStream the {@code KStream} to be joined with this stream + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param windows the specification of the {@link JoinWindows} + * @param joined a {@link Joined} instance that defines the serdes to + * be used to serialize/deserialize inputs and outputs of the joined streams + * @param <VO> the value type of the other stream + * @param <VR> the value type of the result stream + * @return a {@code KStream} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * this {@code KStream} and within the joining window intervals + * @see #join(KStream, ValueJoiner, JoinWindows, Joined) + * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) + */ + <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows, + final Joined<K, V, VO> joined); + + /** * Join records of this stream with another {@code KStream}'s records using windowed left equi join. - * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) inner-join}, all records from + * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows, Joined) inner-join}, all records from * this stream will produce at least one output record (cf. below). * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given @@ -1478,9 +1641,11 @@ public interface KStream<K, V> { * @return a {@code KStream} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * this {@code KStream} and within the joining window intervals - * @see #join(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) - * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) + * @see #join(KStream, ValueJoiner, JoinWindows, Joined) + * @see #outerJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @deprecated use {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined} */ + @Deprecated <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, @@ -1570,9 +1735,91 @@ public interface KStream<K, V> { final JoinWindows windows); /** + * Join records of this stream with another {@code KStream}'s records using windowed outer equi join with default + * serializers and deserializers. + * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows) inner-join} or + * {@link #leftJoin(KStream, ValueJoiner, JoinWindows) left-join}, all records from both streams will produce at + * least one output record (cf. below). + * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. + * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given + * {@link JoinWindows}, i.e., the window defines an additional join predicate on the record timestamps. + * <p> + * For each pair of records meeting both join predicates the provided {@link ValueJoiner} will be called to compute + * a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * Furthermore, for each input record of both {@code KStream}s that does not satisfy the join predicate the provided + * {@link ValueJoiner} will be called with a {@code null} value for the this/other stream, respectively. + * If an input record key or value is {@code null} the record will not be included in the join operation and thus no + * output record will be added to the resulting {@code KStream}. + * <p> + * Example (assuming all input records belong to the correct windows): + * <table border='1'> + * <tr> + * <th>this</th> + * <th>other</th> + * <th>result</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td><K2:B></td> + * <td><K2:b></td> + * <td><K2:ValueJoiner(null,b)><br /><K2:ValueJoiner(B,b)></td> + * </tr> + * <tr> + * <td></td> + * <td><K3:c></td> + * <td><K3:ValueJoiner(null,c)></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * If this is not the case, you would need to call {@link #through(String)} (for one input stream) before doing the + * join, using a pre-created topic with the "correct" number of partitions. + * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner). + * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an + * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. + * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and + * "-repartition" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * <p> + * Repartitioning can happen for one or both of the joining {@code KStream}s. + * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all + * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned + * correctly on its key. + * <p> + * Both of the joining {@code KStream}s will be materialized in local state stores with auto-generated store names. + * For failure and recovery each store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-storeName-changelog", where "applicationId" is user-specified + * in {@link StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, + * "storeName" is an internally generated name, and "-changelog" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param otherStream the {@code KStream} to be joined with this stream + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param windows the specification of the {@link JoinWindows} + * @param <VO> the value type of the other stream + * @param <VR> the value type of the result stream + * @return a {@code KStream} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * both {@code KStream} and within the joining window intervals + * @see #join(KStream, ValueJoiner, JoinWindows, Joined) + * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) + */ + <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows, + final Joined<K, V, VO> joined); + + /** * Join records of this stream with another {@code KStream}'s records using windowed outer equi join. - * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) inner-join} or - * {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) left-join}, all records from both + * In contrast to {@link #join(KStream, ValueJoiner, JoinWindows, Joined) inner-join} or + * {@link #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) left-join}, all records from both * streams will produce at least one output record (cf. below). * The join is computed on the records' key with join attribute {@code thisKStream.key == otherKStream.key}. * Furthermore, two records are only joined if their timestamps are close to each other as defined by the given @@ -1648,9 +1895,11 @@ public interface KStream<K, V> { * @return a {@code KStream} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * both {@code KStream}s and within the joining window intervals - * @see #join(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) - * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Serde, Serde, Serde) + * @see #join(KStream, ValueJoiner, JoinWindows, Joined) + * @see #leftJoin(KStream, ValueJoiner, JoinWindows, Joined) + * @deprecated use {@link #outerJoin(KStream, ValueJoiner, JoinWindows, Joined)} */ + @Deprecated <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> otherStream, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final JoinWindows windows, @@ -1733,6 +1982,83 @@ public interface KStream<K, V> { final ValueJoiner<? super V, ? super VT, ? extends VR> joiner); /** + * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join with default + * serializers and deserializers. + * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. + * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. + * This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal + * {@link KTable} state. + * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and + * will not produce any result records. + * <p> + * For each {@code KStream} record that finds a corresponding record in {@link KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join + * operation and thus no output record will be added to the resulting {@code KStream}. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>KStream</th> + * <th>KTable</th> + * <th>state</th> + * <th>result</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td></td> + * </tr> + * <tr> + * <td><K1:C></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(C,b)></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * If this is not the case, you would need to call {@link #through(String)} for this {@code KStream} before doing + * the join, using a pre-created topic with the same number of partitions as the given {@link KTable}. + * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); + * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. + * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an + * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. + * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and + * "-repartition" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * <p> + * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. + * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all + * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned + * correctly on its key. + * + * @param table the {@link KTable} to be joined with this stream + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param joined a {@link Joined} instance that defines the serdes to + * be used to serialize/deserialize inputs of the joined streams + * @param <VT> the value type of the table + * @param <VR> the value type of the result stream + * @return a {@code KStream} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner, Joined) + * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner) + */ + <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, + final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, + final Joined<K, V, VT> joined); + + /** * Join records of this stream with {@link KTable}'s records using non-windowed inner equi join. * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. @@ -1803,9 +2129,11 @@ public interface KStream<K, V> { * @param <VR> the value type of the result stream * @return a {@code KStream} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner, Serde, Serde) + * @see #leftJoin(KTable, ValueJoiner, Joined) * @see #join(GlobalKTable, KeyValueMapper, ValueJoiner) + * @deprecated use {@link #join(KTable, ValueJoiner, Joined)} */ + @Deprecated <VT, VR> KStream<K, VR> join(final KTable<K, VT> table, final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, final Serde<K> keySerde, @@ -1889,6 +2217,84 @@ public interface KStream<K, V> { final ValueJoiner<? super V, ? super VT, ? extends VR> joiner); /** + * Join records of this stream with {@link KTable}'s records using non-windowed left equi join with default + * serializers and deserializers. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from this stream will produce an + * output record (cf. below). + * The join is a primary key table lookup join with join attribute {@code stream.key == table.key}. + * "Table lookup join" means, that results are only computed if {@code KStream} records are processed. + * This is done by performing a lookup for matching records in the <em>current</em> (i.e., processing time) internal + * {@link KTable} state. + * In contrast, processing {@link KTable} input records will only update the internal {@link KTable} state and + * will not produce any result records. + * <p> + * For each {@code KStream} record whether or not it finds a corresponding record in {@link KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * If no {@link KTable} record was found during lookup, a {@code null} value will be provided to {@link ValueJoiner}. + * The key of the result record is the same as for both joining input records. + * If an {@code KStream} input record key or value is {@code null} the record will not be included in the join + * operation and thus no output record will be added to the resulting {@code KStream}. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>KStream</th> + * <th>KTable</th> + * <th>state</th> + * <th>result</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td></td> + * </tr> + * <tr> + * <td><K1:C></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(C,b)></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * If this is not the case, you would need to call {@link #through(String)} for this {@code KStream} before doing + * the join, using a pre-created topic with the same number of partitions as the given {@link KTable}. + * Furthermore, both input streams need to be co-partitioned on the join key (i.e., use the same partitioner); + * cf. {@link #join(GlobalKTable, KeyValueMapper, ValueJoiner)}. + * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an + * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join. + * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and + * "-repartition" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * <p> + * Repartitioning can happen only for this {@code KStream} but not for the provided {@link KTable}. + * For this case, all data of the stream will be redistributed through the repartitioning topic by writing all + * records to it, and rereading all records from it, such that the join input {@code KStream} is partitioned + * correctly on its key. + * + * @param table the {@link KTable} to be joined with this stream + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param <VT> the value type of the table + * @param <VR> the value type of the result stream + * @return a {@code KStream} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one output for each input {@code KStream} record + * @see #join(KTable, ValueJoiner, Joined) + * @see #leftJoin(GlobalKTable, KeyValueMapper, ValueJoiner) + */ + <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> table, + final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, + final Joined<K, V, VT> joined); + + /** * Join records of this stream with {@link KTable}'s records using non-windowed left equi join. * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from this stream will produce an * output record (cf. below). http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java index 1ed6003..0f0a747 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java @@ -27,15 +27,15 @@ package org.apache.kafka.streams.kstream; * @param <V2> second value type * @param <VR> joined value type * @see KStream#join(KStream, ValueJoiner, JoinWindows) - * @see KStream#join(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KStream#join(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows) - * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KStream#leftJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows) - * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KStream#outerJoin(KStream, ValueJoiner, JoinWindows, Joined) * @see KStream#join(KTable, ValueJoiner) - * @see KStream#join(KTable, ValueJoiner, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KStream#join(KTable, ValueJoiner, Joined) * @see KStream#leftJoin(KTable, ValueJoiner) - * @see KStream#leftJoin(KTable, ValueJoiner, org.apache.kafka.common.serialization.Serde, org.apache.kafka.common.serialization.Serde) + * @see KStream#leftJoin(KTable, ValueJoiner, Joined) * @see KTable#join(KTable, ValueJoiner) * @see KTable#leftJoin(KTable, ValueJoiner) * @see KTable#outerJoin(KTable, ValueJoiner) http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index f46f222..8534da8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; @@ -504,7 +505,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final Serde<K> keySerde, final Serde<V> thisValueSerde, final Serde<V1> otherValueSerde) { - return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, + return doJoin(other, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde), new KStreamImplJoin(false, false)); } @@ -512,7 +513,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public <V1, R> KStream<K, R> join(final KStream<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final JoinWindows windows) { - return join(other, joiner, windows, null, null, null); + return join(other, joiner, windows, Joined.<K, V, V1>with(null, null, null)); + } + + @Override + public <VO, VR> KStream<K, VR> join(final KStream<K, VO> otherStream, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows, + final Joined<K, V, VO> joined) { + return doJoin(otherStream, joiner, windows, joined, + new KStreamImplJoin(false, false)); } @Override @@ -522,36 +532,43 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final Serde<K> keySerde, final Serde<V> thisValueSerde, final Serde<V1> otherValueSerde) { - return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(true, true)); + return outerJoin(other, joiner, windows, Joined.with(keySerde, thisValueSerde, otherValueSerde)); } @Override public <V1, R> KStream<K, R> outerJoin(final KStream<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final JoinWindows windows) { - return outerJoin(other, joiner, windows, null, null, null); + return outerJoin(other, joiner, windows, Joined.<K, V, V1>with(null, null, null)); + } + + @Override + public <VO, VR> KStream<K, VR> outerJoin(final KStream<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows, + final Joined<K, V, VO> joined) { + return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true)); } private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final JoinWindows windows, - final Serde<K> keySerde, - final Serde<V> thisValueSerde, - final Serde<V1> otherValueSerde, + final Joined<K, V, V1> joined, final KStreamImplJoin join) { Objects.requireNonNull(other, "other KStream can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(windows, "windows can't be null"); + Objects.requireNonNull(joined, "joined can't be null"); KStreamImpl<K, V> joinThis = this; KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other; if (joinThis.repartitionRequired) { - joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde); + joinThis = joinThis.repartitionForJoin(joined.keySerde(), joined.valueSerde()); } if (joinOther.repartitionRequired) { - joinOther = joinOther.repartitionForJoin(keySerde, otherValueSerde); + joinOther = joinOther.repartitionForJoin(joined.keySerde(), joined.otherValueSerde()); } joinThis.ensureJoinableWith(joinOther); @@ -560,9 +577,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V joinOther, joiner, windows, - keySerde, - thisValueSerde, - otherValueSerde); + joined); } /** @@ -620,9 +635,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V return doJoin(other, joiner, windows, - keySerde, - thisValSerde, - otherValueSerde, + Joined.with(keySerde, thisValSerde, otherValueSerde), new KStreamImplJoin(true, false)); } @@ -630,22 +643,37 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public <V1, R> KStream<K, R> leftJoin(final KStream<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner, final JoinWindows windows) { - return leftJoin(other, joiner, windows, null, null, null); + return leftJoin(other, joiner, windows, Joined.<K, V, V1>with(null, null, null)); + } + + @Override + public <VO, VR> KStream<K, VR> leftJoin(final KStream<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final JoinWindows windows, + final Joined<K, V, VO> joined) { + Objects.requireNonNull(joined, "joined can't be null"); + return doJoin(other, + joiner, + windows, + joined, + new KStreamImplJoin(true, false)); } @Override public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { - return join(other, joiner, null, null); + return join(other, joiner, Joined.<K, V, V1>with(null, null, null)); } @Override - public <V1, R> KStream<K, R> join(final KTable<K, V1> other, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final Serde<K> keySerde, - final Serde<V> valueSerde) { + public <VT, VR> KStream<K, VR> join(final KTable<K, VT> other, + final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, + final Joined<K, V, VT> joined) { + Objects.requireNonNull(other, "other can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + Objects.requireNonNull(joined, "joined can't be null"); if (repartitionRequired) { - final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde, valueSerde); + final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(joined.keySerde(), joined.valueSerde()); return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false); } else { return doStreamTableJoin(other, joiner, false); @@ -653,6 +681,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } @Override + public <V1, R> KStream<K, R> join(final KTable<K, V1> other, + final ValueJoiner<? super V, ? super V1, ? extends R> joiner, + final Serde<K> keySerde, + final Serde<V> valueSerde) { + return join(other, joiner, Joined.<K, V, V1>with(keySerde, valueSerde, null)); + } + + @Override public <K1, V1, R> KStream<K, R> leftJoin(final GlobalKTable<K1, V1> globalTable, final KeyValueMapper<? super K, ? super V, ? extends K1> keyMapper, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { @@ -698,21 +734,31 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { - return leftJoin(other, joiner, null, null); + return leftJoin(other, joiner, Joined.<K, V, V1>with(null, null, null)); } - public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, - final ValueJoiner<? super V, ? super V1, ? extends R> joiner, - final Serde<K> keySerde, - final Serde<V> valueSerde) { + @Override + public <VT, VR> KStream<K, VR> leftJoin(final KTable<K, VT> other, + final ValueJoiner<? super V, ? super VT, ? extends VR> joiner, + final Joined<K, V, VT> joined) { + Objects.requireNonNull(other, "other can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + Objects.requireNonNull(joined, "joined can't be null"); if (repartitionRequired) { - final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde, valueSerde); + final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(joined.keySerde(), joined.valueSerde()); return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true); } else { return doStreamTableJoin(other, joiner, true); } } + public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, + final ValueJoiner<? super V, ? super V1, ? extends R> joiner, + final Serde<K> keySerde, + final Serde<V> valueSerde) { + return leftJoin(other, joiner, Joined.<K, V, V1>with(keySerde, valueSerde, null)); + } + @Override public <K1> KGroupedStream<K1, V> groupBy(final KeyValueMapper<? super K, ? super V, K1> selector) { return groupBy(selector, Serialized.<K1, V>with(null, null)); @@ -789,9 +835,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V final KStream<K1, V2> other, final ValueJoiner<? super V1, ? super V2, ? extends R> joiner, final JoinWindows windows, - final Serde<K1> keySerde, - final Serde<V1> lhsValueSerde, - final Serde<V2> otherValueSerde) { + final Joined joined) { String thisWindowStreamName = builder.newName(WINDOWED_NAME); String otherWindowStreamName = builder.newName(WINDOWED_NAME); String joinThisName = rightOuter ? builder.newName(OUTERTHIS_NAME) : builder.newName(JOINTHIS_NAME); @@ -799,10 +843,10 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V String joinMergeName = builder.newName(MERGE_NAME); StateStoreSupplier thisWindow = - createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store"); + createWindowedStateStore(windows, joined.keySerde(), joined.valueSerde(), joinThisName + "-store"); StateStoreSupplier otherWindow = - createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store"); + createWindowedStateStore(windows, joined.keySerde(), joined.otherValueSerde(), joinOtherName + "-store"); KStreamJoinWindow<K1, V1> thisWindowedStream = new KStreamJoinWindow<>(thisWindow.name(), http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index b653647..4a356c7 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -32,6 +32,7 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; @@ -212,9 +213,7 @@ public class KStreamRepartitionJoinTest { .join(streamOne.map(keyMapper), TOSTRING_JOINER, getJoinWindow(), - Serdes.Integer(), - Serdes.String(), - Serdes.Integer()) + Joined.with(Serdes.Integer(), Serdes.String(), Serdes.Integer())) .to(Serdes.Integer(), Serdes.String(), output); return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output); @@ -231,9 +230,7 @@ public class KStreamRepartitionJoinTest { map1.leftJoin(map2, TOSTRING_JOINER, getJoinWindow(), - Serdes.Integer(), - Serdes.Integer(), - Serdes.String()) + Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String())) .filterNot(new Predicate<Integer, String>() { @Override public boolean test(Integer key, String value) { @@ -257,9 +254,7 @@ public class KStreamRepartitionJoinTest { final KStream<Integer, String> join = map1.join(map2, TOSTRING_JOINER, getJoinWindow(), - Serdes.Integer(), - Serdes.Integer(), - Serdes.String()); + Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String())); final String topic = "map-join-join-" + testNo; CLUSTER.createTopic(topic); @@ -267,9 +262,7 @@ public class KStreamRepartitionJoinTest { .join(streamFour.map(kvMapper), TOSTRING_JOINER, getJoinWindow(), - Serdes.Integer(), - Serdes.String(), - Serdes.String()) + Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String())) .to(Serdes.Integer(), Serdes.String(), topic); @@ -387,11 +380,9 @@ public class KStreamRepartitionJoinTest { final String outputTopic) throws InterruptedException { CLUSTER.createTopic(outputTopic); lhs.join(rhs, - TOSTRING_JOINER, - getJoinWindow(), - Serdes.Integer(), - Serdes.Integer(), - Serdes.String()) + TOSTRING_JOINER, + getJoinWindow(), + Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String())) .to(Serdes.Integer(), Serdes.String(), outputTopic); } http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 3e50abb..1fed374 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -24,6 +24,7 @@ import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; @@ -46,6 +47,7 @@ import java.util.concurrent.TimeUnit; import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class KStreamImplTest { @@ -127,19 +129,20 @@ public class KStreamImplTest { ); final int anyWindowSize = 1; + final Joined<String, Integer, Integer> joined = Joined.with(stringSerde, intSerde, intSerde); KStream<String, Integer> stream4 = streams2[0].join(streams3[0], new ValueJoiner<Integer, Integer, Integer>() { @Override public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of(anyWindowSize), stringSerde, intSerde, intSerde); + }, JoinWindows.of(anyWindowSize), joined); - KStream<String, Integer> stream5 = streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() { + streams2[1].join(streams3[1], new ValueJoiner<Integer, Integer, Integer>() { @Override public Integer apply(Integer value1, Integer value2) { return value1 + value2; } - }, JoinWindows.of(anyWindowSize), stringSerde, intSerde, intSerde); + }, JoinWindows.of(anyWindowSize), joined); stream4.to("topic-5"); @@ -190,11 +193,11 @@ public class KStreamImplTest { } }); stream.join(kStream, - valueJoiner, - JoinWindows.of(windowSize).until(3 * windowSize), - Serdes.String(), - Serdes.String(), - Serdes.String()) + valueJoiner, + JoinWindows.of(windowSize).until(3 * windowSize), + Joined.with(Serdes.String(), + Serdes.String(), + Serdes.String())) .to(Serdes.String(), Serdes.String(), "output-topic"); ProcessorTopology processorTopology = builder.setApplicationId("X").build(null); @@ -373,4 +376,39 @@ public class KStreamImplTest { null); } + @Test + public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() { + final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "blah"); + try { + testStream.leftJoin(table, + MockValueJoiner.TOSTRING_JOINER, + null); + fail("Should have thrown NullPointerException"); + } catch (final NullPointerException e) { + // ok + } + } + + @Test + public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() { + final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "blah"); + try { + testStream.join(table, + MockValueJoiner.TOSTRING_JOINER, + null); + fail("Should have thrown NullPointerException"); + } catch (final NullPointerException e) { + // ok + } + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnJoinWithStreamWhenJoinedIsNull() { + testStream.join(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10), null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnOuterJoinJoinedIsNull() { + testStream.outerJoin(testStream, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(10), null); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java index a733fae..ab7ca53 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.test.KStreamTestDriver; @@ -72,7 +73,10 @@ public class KStreamKStreamJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); + joined = stream1.join(stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(100), + Joined.with(intSerde, stringSerde, stringSerde)); joined.process(processor); Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -170,7 +174,10 @@ public class KStreamKStreamJoinTest { processor = new MockProcessorSupplier<>(); stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.outerJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); + joined = stream1.outerJoin(stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(100), + Joined.with(intSerde, stringSerde, stringSerde)); joined.process(processor); Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -271,7 +278,10 @@ public class KStreamKStreamJoinTest { stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); + joined = stream1.join(stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(100), + Joined.with(intSerde, stringSerde, stringSerde)); joined.process(processor); Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -498,7 +508,12 @@ public class KStreamKStreamJoinTest { stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).after(100), intSerde, stringSerde, stringSerde); + joined = stream1.join(stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(0).after(100), + Joined.with(intSerde, + stringSerde, + stringSerde)); joined.process(processor); Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -607,7 +622,10 @@ public class KStreamKStreamJoinTest { stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(0).before(100), intSerde, stringSerde, stringSerde); + joined = stream1.join(stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(0).before(100), + Joined.with(intSerde, stringSerde, stringSerde)); joined.process(processor); Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); http://git-wip-us.apache.org/repos/asf/kafka/blob/45394d52/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java index e0ba9fe..7d8297c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsBuilderTest; import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.Joined; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; import org.apache.kafka.test.KStreamTestDriver; @@ -73,7 +74,10 @@ public class KStreamKStreamLeftJoinTest { stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); + joined = stream1.leftJoin(stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(100), + Joined.with(intSerde, stringSerde, stringSerde)); joined.process(processor); final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder); @@ -149,9 +153,7 @@ public class KStreamKStreamLeftJoinTest { @Test public void testWindowing() throws Exception { final StreamsBuilder builder = new StreamsBuilder(); - final int[] expectedKeys = new int[]{0, 1, 2, 3}; - long time = 0L; final KStream<Integer, String> stream1; @@ -163,7 +165,10 @@ public class KStreamKStreamLeftJoinTest { stream1 = builder.stream(intSerde, stringSerde, topic1); stream2 = builder.stream(intSerde, stringSerde, topic2); - joined = stream1.leftJoin(stream2, MockValueJoiner.TOSTRING_JOINER, JoinWindows.of(100), intSerde, stringSerde, stringSerde); + joined = stream1.leftJoin(stream2, + MockValueJoiner.TOSTRING_JOINER, + JoinWindows.of(100), + Joined.with(intSerde, stringSerde, stringSerde)); joined.process(processor); final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);