KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77) - fixed leftJoin -> outerJoin test bug - simplified to only use values - fixed inner KTable-KTable join - fixed left KTable-KTable join - fixed outer KTable-KTable join - fixed inner, left, and outer left KStream-KStream joins - added inner KStream-KTable join - fixed left KStream-KTable join
Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Damian Guy <damian....@gmail.com>, Guozhang Wang <wangg...@gmail.com> Closes #1777 from mjsax/kafka-4001-joins Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/62c0972e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/62c0972e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/62c0972e Branch: refs/heads/trunk Commit: 62c0972efc525cc0677bd3fd470bd9fbbd70b004 Parents: 24067e4 Author: Matthias J. Sax <matth...@confluent.io> Authored: Thu Oct 20 13:06:25 2016 -0700 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Thu Oct 20 13:06:25 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 44 +- .../streams/kstream/internals/KStreamImpl.java | 272 +++++------- .../kstream/internals/KStreamKStreamJoin.java | 27 +- .../kstream/internals/KStreamKTableJoin.java | 75 ++++ .../internals/KStreamKTableLeftJoin.java | 66 --- .../internals/KStreamWindowAggregate.java | 2 +- .../streams/kstream/internals/KTableImpl.java | 103 ++--- .../kstream/internals/KTableKTableJoin.java | 21 +- .../kstream/internals/KTableKTableLeftJoin.java | 19 +- .../internals/KTableKTableOuterJoin.java | 16 +- .../internals/KTableKTableRightJoin.java | 24 +- .../internals/ProcessorContextImpl.java | 4 - .../integration/JoinIntegrationTest.java | 433 +++++++++++++++++++ .../KTableKTableJoinIntegrationTest.java | 74 ++-- .../internals/KStreamKStreamLeftJoinTest.java | 98 +++-- .../internals/KStreamKTableJoinTest.java | 146 +++++++ .../internals/KStreamWindowAggregateTest.java | 22 +- .../kstream/internals/KTableKTableJoinTest.java | 93 ++-- .../internals/KTableKTableLeftJoinTest.java | 18 +- .../internals/KTableKTableOuterJoinTest.java | 12 +- 20 files changed, 1075 insertions(+), 494 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 032efb5..4483e9f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -533,6 +533,39 @@ public interface KStream<K, V> { JoinWindows windows); /** + * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join. + * If a record key or value is {@code null} it will not included in the resulting {@link KStream} + * + * @param table the instance of {@link KTable} joined with this stream + * @param joiner the instance of {@link ValueJoiner} + * @param <V1> the value type of the table + * @param <V2> the value type of the new stream + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key + */ + <V1, V2> KStream<K, V2> join(KTable<K, V1> table, ValueJoiner<V, V1, V2> joiner); + + /** + * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Inner Join. + * If a record key or value is {@code null} it will not included in the resulting {@link KStream} + * + * @param table the instance of {@link KTable} joined with this stream + * @param valueJoiner the instance of {@link ValueJoiner} + * @param keySerde key serdes for materializing this stream. + * If not specified the default serdes defined in the configs will be used + * @param valSerde value serdes for materializing this stream, + * if not specified the default serdes defined in the configs will be used + * @param <V1> the value type of the table + * @param <V2> the value type of the new stream + * @return a {@link KStream} that contains join-records for each key and values computed by the given {@link ValueJoiner}, + * one for each matched record-pair with the same key and within the joining window intervals + */ + <V1, V2> KStream<K, V2> join(KTable<K, V1> table, + ValueJoiner<V, V1, V2> valueJoiner, + Serde<K> keySerde, + Serde<V> valSerde); + + /** * Combine values of this stream with {@link KTable}'s elements of the same key using non-windowed Left Join. * If a record key is null it will not included in the resulting {@link KStream} * @@ -566,6 +599,7 @@ public interface KStream<K, V> { ValueJoiner<V, V1, V2> valueJoiner, Serde<K> keySerde, Serde<V> valSerde); + /** * Group the records of this {@link KStream} using the provided {@link KeyValueMapper} and * default serializers and deserializers. If a record key is null it will not included in @@ -592,8 +626,8 @@ public interface KStream<K, V> { * @return a {@link KGroupedStream} that contains the grouped records of the original {@link KStream} */ <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector, - Serde<K1> keySerde, - Serde<V> valSerde); + Serde<K1> keySerde, + Serde<V> valSerde); /** * Group the records with the same key into a {@link KGroupedStream} while preserving the http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index bb77e96..b67fca5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -20,24 +20,25 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.JoinWindows; import org.apache.kafka.streams.kstream.KGroupedStream; +import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.ForeachAction; -import org.apache.kafka.streams.kstream.TransformerSupplier; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.ValueTransformerSupplier; -import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KeyValueMapper; import org.apache.kafka.streams.kstream.Predicate; +import org.apache.kafka.streams.kstream.TransformerSupplier; +import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.ValueTransformerSupplier; import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.Stores; + import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.PrintStream; @@ -63,6 +64,8 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V public static final String JOINOTHER_NAME = "KSTREAM-JOINOTHER-"; + public static final String JOIN_NAME = "KSTREAM-JOIN-"; + public static final String LEFTJOIN_NAME = "KSTREAM-LEFTJOIN-"; private static final String MAP_NAME = "KSTREAM-MAP-"; @@ -345,7 +348,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V Serializer<K> keySerializer = keySerde == null ? null : keySerde.serializer(); Serializer<V> valSerializer = valSerde == null ? null : valSerde.serializer(); - + if (partitioner == null && keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; partitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); @@ -386,78 +389,59 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public <V1, R> KStream<K, R> join( - KStream<K, V1> other, - ValueJoiner<V, V1, R> joiner, - JoinWindows windows, - Serde<K> keySerde, - Serde<V> thisValueSerde, - Serde<V1> otherValueSerde) { + final KStream<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final JoinWindows windows, + final Serde<K> keySerde, + final Serde<V> thisValueSerde, + final Serde<V1> otherValueSerde) { - return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, false); + return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(false, false)); } @Override public <V1, R> KStream<K, R> join( - KStream<K, V1> other, - ValueJoiner<V, V1, R> joiner, - JoinWindows windows) { + final KStream<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final JoinWindows windows) { - return join(other, joiner, windows, null, null, null, false); + return join(other, joiner, windows, null, null, null); } @Override public <V1, R> KStream<K, R> outerJoin( - KStream<K, V1> other, - ValueJoiner<V, V1, R> joiner, - JoinWindows windows, - Serde<K> keySerde, - Serde<V> thisValueSerde, - Serde<V1> otherValueSerde) { + final KStream<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final JoinWindows windows, + final Serde<K> keySerde, + final Serde<V> thisValueSerde, + final Serde<V1> otherValueSerde) { - return join(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, true); + return doJoin(other, joiner, windows, keySerde, thisValueSerde, otherValueSerde, new KStreamImplJoin(true, true)); } @Override public <V1, R> KStream<K, R> outerJoin( - KStream<K, V1> other, - ValueJoiner<V, V1, R> joiner, - JoinWindows windows) { + final KStream<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final JoinWindows windows) { - return join(other, joiner, windows, null, null, null, true); + return outerJoin(other, joiner, windows, null, null, null); } - @SuppressWarnings("unchecked") - private <V1, R> KStream<K, R> join( - KStream<K, V1> other, - ValueJoiner<V, V1, R> joiner, - JoinWindows windows, - Serde<K> keySerde, - Serde<V> thisValueSerde, - Serde<V1> otherValueSerde, - boolean outer) { - - return doJoin(other, - joiner, - windows, - keySerde, - thisValueSerde, - otherValueSerde, - new DefaultJoin(outer)); - } - - private <V1, R> KStream<K, R> doJoin(KStream<K, V1> other, - ValueJoiner<V, V1, R> joiner, - JoinWindows windows, - Serde<K> keySerde, - Serde<V> thisValueSerde, - Serde<V1> otherValueSerde, - KStreamImplJoin join) { + private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final JoinWindows windows, + final Serde<K> keySerde, + final Serde<V> thisValueSerde, + final Serde<V1> otherValueSerde, + final KStreamImplJoin join) { Objects.requireNonNull(other, "other KStream can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(windows, "windows can't be null"); KStreamImpl<K, V> joinThis = this; - KStreamImpl<K, V1> joinOther = (KStreamImpl) other; + KStreamImpl<K, V1> joinOther = (KStreamImpl<K, V1>) other; if (joinThis.repartitionRequired) { joinThis = joinThis.repartitionForJoin(keySerde, thisValueSerde, null); @@ -531,20 +515,20 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @SuppressWarnings("unchecked") @Override public <V1, R> KStream<K, R> leftJoin( - KStream<K, V1> other, - ValueJoiner<V, V1, R> joiner, - JoinWindows windows, - Serde<K> keySerde, - Serde<V> thisValSerde, - Serde<V1> otherValueSerde) { + final KStream<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final JoinWindows windows, + final Serde<K> keySerde, + final Serde<V> thisValSerde, + final Serde<V1> otherValueSerde) { return doJoin(other, - joiner, - windows, - keySerde, - thisValSerde, - otherValueSerde, - new LeftJoin()); + joiner, + windows, + keySerde, + thisValSerde, + otherValueSerde, + new KStreamImplJoin(true, false)); } @Override @@ -558,50 +542,69 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @SuppressWarnings("unchecked") @Override - public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) { - return leftJoin(other, joiner, null, null); + public <V1, R> KStream<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) { + return join(other, joiner, null, null); } - public <V1, R> KStream<K, R> leftJoin(KTable<K, V1> other, - ValueJoiner<V, V1, R> joiner, - Serde<K> keySerde, - Serde<V> valueSerde) { - Objects.requireNonNull(other, "other KTable can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - + @Override + public <V1, R> KStream<K, R> join(final KTable<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final Serde<K> keySerde, + final Serde<V> valueSerde) { if (repartitionRequired) { - KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde, - valueSerde, null); - return thisStreamRepartitioned.doStreamTableLeftJoin(other, joiner); + final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin(keySerde, + valueSerde, null); + return thisStreamRepartitioned.doStreamTableJoin(other, joiner, false); } else { - return doStreamTableLeftJoin(other, joiner); + return doStreamTableJoin(other, joiner, false); } - } - private <V1, R> KStream<K, R> doStreamTableLeftJoin(final KTable<K, V1> other, - final ValueJoiner<V, V1, R> joiner) { - Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); + private <V1, R> KStream<K, R> doStreamTableJoin(final KTable<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final boolean leftJoin) { + Objects.requireNonNull(other, "other KTable can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + + final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); - String name = topology.newName(LEFTJOIN_NAME); + final String name = topology.newName(leftJoin ? LEFTJOIN_NAME : JOIN_NAME); - topology.addProcessor(name, new KStreamKTableLeftJoin<>((KTableImpl<K, ?, V1>) other, joiner), this.name); - topology.connectProcessorAndStateStores(name, ((KTableImpl<K, ?, V1>) other).valueGetterSupplier().storeNames()); + topology.addProcessor(name, new KStreamKTableJoin<>((KTableImpl<K, ?, V1>) other, joiner, leftJoin), this.name); + topology.connectProcessorAndStateStores(name, other.getStoreName()); topology.connectProcessors(this.name, ((KTableImpl<K, ?, V1>) other).name); return new KStreamImpl<>(topology, name, allSourceNodes, false); } @Override + public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) { + return leftJoin(other, joiner, null, null); + } + + public <V1, R> KStream<K, R> leftJoin(final KTable<K, V1> other, + final ValueJoiner<V, V1, R> joiner, + final Serde<K> keySerde, + final Serde<V> valueSerde) { + if (repartitionRequired) { + final KStreamImpl<K, V> thisStreamRepartitioned = this.repartitionForJoin(keySerde, + valueSerde, null); + return thisStreamRepartitioned.doStreamTableJoin(other, joiner, true); + } else { + return doStreamTableJoin(other, joiner, true); + } + } + + @Override public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector) { return groupBy(selector, null, null); } @Override public <K1> KGroupedStream<K1, V> groupBy(KeyValueMapper<K, V, K1> selector, - Serde<K1> keySerde, - Serde<V> valSerde) { + Serde<K1> keySerde, + Serde<V> valSerde) { Objects.requireNonNull(selector, "selector can't be null"); String selectName = internalSelectKey(selector); @@ -641,26 +644,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V .build(); } - private interface KStreamImplJoin { + private class KStreamImplJoin { - <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, - KStream<K1, V2> other, - ValueJoiner<V1, V2, R> joiner, - JoinWindows windows, - Serde<K1> keySerde, - Serde<V1> lhsValueSerde, - Serde<V2> otherValueSerde); - } - - private class DefaultJoin implements KStreamImplJoin { - - private final boolean outer; + private final boolean leftOuter; + private final boolean rightOuter; - DefaultJoin(final boolean outer) { - this.outer = outer; + KStreamImplJoin(final boolean leftOuter, final boolean rightOuter) { + this.leftOuter = leftOuter; + this.rightOuter = rightOuter; } - @Override public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, KStream<K1, V2> other, ValueJoiner<V1, V2, R> joiner, @@ -670,12 +663,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V Serde<V2> otherValueSerde) { String thisWindowStreamName = topology.newName(WINDOWED_NAME); String otherWindowStreamName = topology.newName(WINDOWED_NAME); - String joinThisName = outer ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME); - String joinOtherName = outer ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME); + String joinThisName = rightOuter ? topology.newName(OUTERTHIS_NAME) : topology.newName(JOINTHIS_NAME); + String joinOtherName = leftOuter ? topology.newName(OUTEROTHER_NAME) : topology.newName(JOINOTHER_NAME); String joinMergeName = topology.newName(MERGE_NAME); StateStoreSupplier thisWindow = - createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store"); + createWindowedStateStore(windows, keySerde, lhsValueSerde, joinThisName + "-store"); StateStoreSupplier otherWindow = createWindowedStateStore(windows, keySerde, otherValueSerde, joinOtherName + "-store"); @@ -688,16 +681,16 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V windows.before + windows.after + 1, windows.maintainMs()); - KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), - windows.before, - windows.after, - joiner, - outer); - KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), - windows.after, - windows.before, - reverseJoiner(joiner), - outer); + final KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), + windows.before, + windows.after, + joiner, + leftOuter); + final KStreamKStreamJoin<K1, R, V2, V1> joinOther = new KStreamKStreamJoin<>(thisWindow.name(), + windows.after, + windows.before, + reverseJoiner(joiner), + rightOuter); KStreamPassThrough<K1, R> joinMerge = new KStreamPassThrough<>(); @@ -716,39 +709,4 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V } } - - private class LeftJoin implements KStreamImplJoin { - - @Override - public <K1, R, V1, V2> KStream<K1, R> join(KStream<K1, V1> lhs, - KStream<K1, V2> other, - ValueJoiner<V1, V2, R> joiner, - JoinWindows windows, - Serde<K1> keySerde, - Serde<V1> lhsValueSerde, - Serde<V2> otherValueSerde) { - String otherWindowStreamName = topology.newName(WINDOWED_NAME); - String joinThisName = topology.newName(LEFTJOIN_NAME); - - StateStoreSupplier otherWindow = - createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store"); - - KStreamJoinWindow<K1, V1> - otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs()); - KStreamKStreamJoin<K1, R, V1, V2> - joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true); - - - - topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name); - topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name); - topology.addStateStore(otherWindow, joinThisName, otherWindowStreamName); - - Set<String> allSourceNodes = new HashSet<>(((AbstractStream) lhs).sourceNodes); - allSourceNodes.addAll(((KStreamImpl<K1, V2>) other).sourceNodes); - return new KStreamImpl<>(topology, joinThisName, allSourceNodes, false); - } - } - - } http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java index edde009..41547b9 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -52,7 +52,6 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { private WindowStore<K, V2> otherWindow; - @SuppressWarnings("unchecked") @Override public void init(ProcessorContext context) { super.init(context); @@ -62,14 +61,21 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { @Override - public void process(K key, V1 value) { - if (key == null) + public void process(final K key, final V1 value) { + // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record + // + // we also ignore the record if value is null, because in a key-value data model a null-value indicates + // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics + // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- + // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored + if (key == null || value == null) { return; + } - boolean needOuterJoin = KStreamKStreamJoin.this.outer; + boolean needOuterJoin = outer; - long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs); - long timeTo = Math.max(0L, context().timestamp() + joinAfterMs); + final long timeFrom = Math.max(0L, context().timestamp() - joinBeforeMs); + final long timeTo = Math.max(0L, context().timestamp() + joinAfterMs); try (WindowStoreIterator<V2> iter = otherWindow.fetch(key, timeFrom, timeTo)) { while (iter.hasNext()) { @@ -77,8 +83,9 @@ class KStreamKStreamJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { context().forward(key, joiner.apply(value, iter.next().value)); } - if (needOuterJoin) + if (needOuterJoin) { context().forward(key, joiner.apply(value, null)); + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java new file mode 100644 index 0000000..1027b96 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoin.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.processor.AbstractProcessor; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; + +class KStreamKTableJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { + + private final KTableValueGetterSupplier<K, V2> valueGetterSupplier; + private final ValueJoiner<V1, V2, R> joiner; + private final boolean leftJoin; + + KStreamKTableJoin(final KTableImpl<K, ?, V2> table, final ValueJoiner<V1, V2, R> joiner, final boolean leftJoin) { + valueGetterSupplier = table.valueGetterSupplier(); + this.joiner = joiner; + this.leftJoin = leftJoin; + } + + @Override + public Processor<K, V1> get() { + return new KStreamKTableJoinProcessor(valueGetterSupplier.get(), leftJoin); + } + + private class KStreamKTableJoinProcessor extends AbstractProcessor<K, V1> { + + private final KTableValueGetter<K, V2> valueGetter; + private final boolean leftJoin; + + KStreamKTableJoinProcessor(final KTableValueGetter<K, V2> valueGetter, final boolean leftJoin) { + this.valueGetter = valueGetter; + this.leftJoin = leftJoin; + } + + @Override + public void init(final ProcessorContext context) { + super.init(context); + valueGetter.init(context); + } + + @Override + public void process(final K key, final V1 value) { + // we do join iff keys are equal, thus, if key is null we cannot join and just ignore the record + // + // we also ignore the record if value is null, because in a key-value data model a null-value indicates + // an empty message (ie, there is nothing to be joined) -- this contrast SQL NULL semantics + // furthermore, on left/outer joins 'null' in ValueJoiner#apply() indicates a missing record -- + // thus, to be consistent and to avoid ambiguous null semantics, null values are ignored + if (key != null && value != null) { + final V2 value2 = valueGetter.get(key); + if (leftJoin || value2 != null) { + context().forward(key, joiner.apply(value, value2)); + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java deleted file mode 100644 index 92b9b59..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoin.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.processor.AbstractProcessor; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; - -class KStreamKTableLeftJoin<K, R, V1, V2> implements ProcessorSupplier<K, V1> { - - private final KTableValueGetterSupplier<K, V2> valueGetterSupplier; - private final ValueJoiner<V1, V2, R> joiner; - - KStreamKTableLeftJoin(KTableImpl<K, ?, V2> table, ValueJoiner<V1, V2, R> joiner) { - this.valueGetterSupplier = table.valueGetterSupplier(); - this.joiner = joiner; - } - - @Override - public Processor<K, V1> get() { - return new KStreamKTableLeftJoinProcessor(valueGetterSupplier.get()); - } - - private class KStreamKTableLeftJoinProcessor extends AbstractProcessor<K, V1> { - - private final KTableValueGetter<K, V2> valueGetter; - - public KStreamKTableLeftJoinProcessor(KTableValueGetter<K, V2> valueGetter) { - this.valueGetter = valueGetter; - } - - @SuppressWarnings("unchecked") - @Override - public void init(ProcessorContext context) { - super.init(context); - valueGetter.init(context); - } - - @Override - public void process(K key, V1 value) { - // if the key is null, we do not need proceed joining - // the record with the table - if (key != null) { - context().forward(key, joiner.apply(value, valueGetter.get(key))); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java index 718e52b..55b0916 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregate.java @@ -17,8 +17,8 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Aggregator; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 6423cff..683dc00 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -44,6 +44,7 @@ import java.util.Set; /** * The implementation class of {@link KTable}. + * * @param <K> the key type * @param <S> the source's (parent's) value type * @param <V> the value type @@ -283,77 +284,55 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, return toStream().selectKey(mapper); } - @SuppressWarnings("unchecked") @Override - public <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) { - Objects.requireNonNull(other, "other can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - - Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); - - String joinThisName = topology.newName(JOINTHIS_NAME); - String joinOtherName = topology.newName(JOINOTHER_NAME); - String joinMergeName = topology.newName(MERGE_NAME); - - KTableKTableJoin<K, R, V, V1> joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); - KTableKTableJoin<K, R, V1, V> joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); - KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>( - new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName), - new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName()) - ); - - topology.addProcessor(joinThisName, joinThis, this.name); - topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name); - topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); - topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames()); - topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames()); - - return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null); + public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) { + return doJoin(other, joiner, false, false); } - @SuppressWarnings("unchecked") @Override - public <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) { - Objects.requireNonNull(other, "other can't be null"); - Objects.requireNonNull(joiner, "joiner can't be null"); - - Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); - - String joinThisName = topology.newName(OUTERTHIS_NAME); - String joinOtherName = topology.newName(OUTEROTHER_NAME); - String joinMergeName = topology.newName(MERGE_NAME); - - KTableKTableOuterJoin<K, R, V, V1> joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); - KTableKTableOuterJoin<K, R, V1, V> joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); - KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>( - new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName), - new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName()) - ); - - topology.addProcessor(joinThisName, joinThis, this.name); - topology.addProcessor(joinOtherName, joinOther, ((KTableImpl) other).name); - topology.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); - topology.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames()); - topology.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames()); + public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) { + return doJoin(other, joiner, true, true); + } - return new KTableImpl<>(topology, joinMergeName, joinMerge, allSourceNodes, null); + @Override + public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<V, V1, R> joiner) { + return doJoin(other, joiner, true, false); } @SuppressWarnings("unchecked") - @Override - public <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<V, V1, R> joiner) { + private <V1, R> KTable<K, R> doJoin(final KTable<K, V1> other, ValueJoiner<V, V1, R> joiner, final boolean leftOuter, final boolean rightOuter) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); - Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); - String joinThisName = topology.newName(LEFTTHIS_NAME); - String joinOtherName = topology.newName(LEFTOTHER_NAME); - String joinMergeName = topology.newName(MERGE_NAME); + final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); + + if (leftOuter) { + enableSendingOldValues(); + } + if (rightOuter) { + ((KTableImpl) other).enableSendingOldValues(); + } + + final String joinThisName = topology.newName(JOINTHIS_NAME); + final String joinOtherName = topology.newName(JOINOTHER_NAME); + final String joinMergeName = topology.newName(MERGE_NAME); + + final KTableKTableAbstractJoin<K, R, V, V1> joinThis; + final KTableKTableAbstractJoin<K, R, V1, V> joinOther; + + if (!leftOuter) { // inner + joinThis = new KTableKTableJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); + joinOther = new KTableKTableJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); + } else if (!rightOuter) { // left + joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); + joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); + } else { // outer + joinThis = new KTableKTableOuterJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); + joinOther = new KTableKTableOuterJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); + } - KTableKTableLeftJoin<K, R, V, V1> joinThis = new KTableKTableLeftJoin<>(this, (KTableImpl<K, ?, V1>) other, joiner); - KTableKTableRightJoin<K, R, V1, V> joinOther = new KTableKTableRightJoin<>((KTableImpl<K, ?, V1>) other, this, reverseJoiner(joiner)); - KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>( - new KTableImpl<K, V, R>(topology, joinThisName, joinThis, this.sourceNodes, this.storeName), + final KTableKTableJoinMerger<K, R> joinMerge = new KTableKTableJoinMerger<>( + new KTableImpl<K, V, R>(topology, joinThisName, joinThis, sourceNodes, storeName), new KTableImpl<K, V1, R>(topology, joinOtherName, joinOther, ((KTableImpl<K, ?, ?>) other).sourceNodes, other.getStoreName()) ); http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java index cbd626d..49f6715 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -69,23 +69,26 @@ class KTableKTableJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, V1, * @throws StreamsException if key is null */ @Override - public void process(K key, Change<V1> change) { + public void process(final K key, final Change<V1> change) { // the keys should never be null if (key == null) throw new StreamsException("Record key for KTable join operator should not be null."); R newValue = null; R oldValue = null; - V2 value2 = null; - if (change.newValue != null || change.oldValue != null) - value2 = valueGetter.get(key); + final V2 value2 = valueGetter.get(key); + if (value2 == null) { + return; + } - if (change.newValue != null && value2 != null) + if (change.newValue != null) { newValue = joiner.apply(change.newValue, value2); + } - if (sendOldValues && change.oldValue != null && value2 != null) + if (sendOldValues && change.oldValue != null) { oldValue = joiner.apply(change.oldValue, value2); + } context().forward(key, new Change<>(newValue, oldValue)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java index 4bee38c..5f5cad6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -70,27 +70,28 @@ class KTableKTableLeftJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, * @throws StreamsException if key is null */ @Override - public void process(K key, Change<V1> change) { + public void process(final K key, final Change<V1> change) { // the keys should never be null if (key == null) throw new StreamsException("Record key for KTable left-join operator should not be null."); R newValue = null; R oldValue = null; - V2 value2 = null; - if (change.newValue != null || change.oldValue != null) - value2 = valueGetter.get(key); + final V2 value2 = valueGetter.get(key); + if (value2 == null && change.newValue == null && change.oldValue == null) { + return; + } - if (change.newValue != null) + if (change.newValue != null) { newValue = joiner.apply(change.newValue, value2); + } if (sendOldValues && change.oldValue != null) oldValue = joiner.apply(change.oldValue, value2); context().forward(key, new Change<>(newValue, oldValue)); } - } private class KTableKTableLeftJoinValueGetter implements KTableValueGetter<K, R> { http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java index ad7dbde..2bfd8a5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoin.java @@ -69,21 +69,25 @@ class KTableKTableOuterJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, * @throws StreamsException if key is null */ @Override - public void process(K key, Change<V1> change) { + public void process(final K key, final Change<V1> change) { // the keys should never be null if (key == null) throw new StreamsException("Record key for KTable outer-join operator should not be null."); R newValue = null; R oldValue = null; - V2 value2 = valueGetter.get(key); - if (change.newValue != null || value2 != null) + final V2 value2 = valueGetter.get(key); + if (value2 == null && change.newValue == null && change.oldValue == null) { + return; + } + + if (value2 != null || change.newValue != null) { newValue = joiner.apply(change.newValue, value2); + } - if (sendOldValues) { - if (change.oldValue != null || value2 != null) - oldValue = joiner.apply(change.oldValue, value2); + if (sendOldValues && (value2 != null || change.oldValue != null)) { + oldValue = joiner.apply(change.oldValue, value2); } context().forward(key, new Change<>(newValue, oldValue)); http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java index 80aadaa..8aeadcc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableKTableRightJoin.java @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -70,19 +70,23 @@ class KTableKTableRightJoin<K, R, V1, V2> extends KTableKTableAbstractJoin<K, R, * @throws StreamsException if key is null */ @Override - public void process(K key, Change<V1> change) { + public void process(final K key, final Change<V1> change) { // the keys should never be null if (key == null) throw new StreamsException("Record key for KTable right-join operator should not be null."); - R newValue = null; + final R newValue; R oldValue = null; - V2 value2 = valueGetter.get(key); - if (value2 != null) { - newValue = joiner.apply(change.newValue, value2); - if (sendOldValues) - oldValue = joiner.apply(change.oldValue, value2); + final V2 value2 = valueGetter.get(key); + if (value2 == null) { + return; + } + + newValue = joiner.apply(change.newValue, value2); + + if (sendOldValues) { + oldValue = joiner.apply(change.oldValue, value2); } context().forward(key, new Change<>(newValue, oldValue)); http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java index 195e5a4..11ca30e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java @@ -48,7 +48,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol private RecordContext recordContext; private ProcessorNode currentNode; - @SuppressWarnings("unchecked") public ProcessorContextImpl(TaskId id, StreamTask task, StreamsConfig config, @@ -194,7 +193,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol return recordContext.timestamp(); } - @SuppressWarnings("unchecked") @Override public <K, V> void forward(K key, V value) { ProcessorNode previousNode = currentNode; @@ -208,7 +206,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol } } - @SuppressWarnings("unchecked") @Override public <K, V> void forward(K key, V value, int childIndex) { ProcessorNode previousNode = currentNode; @@ -221,7 +218,6 @@ public class ProcessorContextImpl implements InternalProcessorContext, RecordCol } } - @SuppressWarnings("unchecked") @Override public <K, V> void forward(K key, V value, String childName) { for (ProcessorNode child : (List<ProcessorNode<K, V>>) currentNode.children()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java new file mode 100644 index 0000000..0f70588 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java @@ -0,0 +1,433 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.integration; + +import kafka.utils.ZkUtils; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.security.JaasUtils; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; +import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; +import org.apache.kafka.streams.kstream.JoinWindows; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.TestCondition; +import org.apache.kafka.test.TestUtils; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.Set; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; + +/** + * Tests all available joins of Kafka Streams DSL. + */ +public class JoinIntegrationTest { + @ClassRule + public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1); + + private static ZkUtils zkUtils = null; + + private static final String APP_ID = "join-integration-test"; + private static final String INPUT_TOPIC_1 = "inputTopicLeft"; + private static final String INPUT_TOPIC_2 = "inputTopicRight"; + private static final String OUTPUT_TOPIC = "outputTopic"; + + private final static Properties PRODUCER_CONFIG = new Properties(); + private final static Properties RESULT_CONSUMER_CONFIG = new Properties(); + private final static Properties STREAMS_CONFIG = new Properties(); + + private KStreamBuilder builder; + private KStream<Long, String> leftStream; + private KStream<Long, String> rightStream; + private KTable<Long, String> leftTable; + private KTable<Long, String> rightTable; + + private final List<Input<String>> input = Arrays.asList( + new Input<>(INPUT_TOPIC_1, (String) null), + new Input<>(INPUT_TOPIC_2, (String) null), + new Input<>(INPUT_TOPIC_1, "A"), + new Input<>(INPUT_TOPIC_2, "a"), + new Input<>(INPUT_TOPIC_1, "B"), + new Input<>(INPUT_TOPIC_2, "b"), + new Input<>(INPUT_TOPIC_1, (String) null), + new Input<>(INPUT_TOPIC_2, (String) null), + new Input<>(INPUT_TOPIC_1, "C"), + new Input<>(INPUT_TOPIC_2, "c"), + new Input<>(INPUT_TOPIC_2, (String) null), + new Input<>(INPUT_TOPIC_1, (String) null), + new Input<>(INPUT_TOPIC_2, (String) null), + new Input<>(INPUT_TOPIC_2, "d"), + new Input<>(INPUT_TOPIC_1, "D") + ); + + private final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() { + @Override + public String apply(final String value1, final String value2) { + return value1 + "-" + value2; + } + }; + + private final TestCondition topicsGotDeleted = new TopicsGotDeletedCondition(); + + @BeforeClass + public static void setupConfigsAndUtils() throws Exception { + PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all"); + PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0); + PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); + PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer"); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class); + RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); + STREAMS_CONFIG.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString()); + STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()); + STREAMS_CONFIG.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass()); + STREAMS_CONFIG.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); + STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + + zkUtils = ZkUtils.apply(CLUSTER.zKConnectString(), + 30000, + 30000, + JaasUtils.isZkSecurityEnabled()); + } + + @AfterClass + public static void release() { + if (zkUtils != null) { + zkUtils.close(); + } + } + + @Before + public void prepareTopology() throws Exception { + CLUSTER.createTopic(INPUT_TOPIC_1); + CLUSTER.createTopic(INPUT_TOPIC_2); + CLUSTER.createTopic(OUTPUT_TOPIC); + + builder = new KStreamBuilder(); + leftTable = builder.table(INPUT_TOPIC_1, "leftTable"); + rightTable = builder.table(INPUT_TOPIC_2, "rightTable"); + leftStream = leftTable.toStream(); + rightStream = rightTable.toStream(); + } + + @After + public void cleanup() throws Exception { + CLUSTER.deleteTopic(INPUT_TOPIC_1); + CLUSTER.deleteTopic(INPUT_TOPIC_2); + CLUSTER.deleteTopic(OUTPUT_TOPIC); + + TestUtils.waitForCondition(topicsGotDeleted, 120000, "Topics not deleted after 120 seconds."); + } + + private void checkResult(final String outputTopic, final List<String> expectedResult) throws Exception { + if (expectedResult != null) { + final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), Long.MAX_VALUE); + assertThat(result, is(expectedResult)); + } + } + + /* + * Runs the actual test. Checks the result after each input record to ensure fixed processing order. + * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry + */ + private void runTest(final List<List<String>> expectedResult) throws Exception { + assert expectedResult.size() == input.size(); + + IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG); + final KafkaStreams streams = new KafkaStreams(builder, STREAMS_CONFIG); + try { + streams.start(); + + long ts = System.currentTimeMillis(); + + final Iterator<List<String>> resultIterator = expectedResult.iterator(); + for (final Input<String> singleInput : input) { + IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic, Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts); + checkResult(OUTPUT_TOPIC, resultIterator.next()); + } + } finally { + streams.close(); + } + } + + @Test + public void testInnerKStreamKStream() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KStream"); + + final List<List<String>> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testLeftKStreamKStream() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KStream"); + + final List<List<String>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testOuterKStreamKStream() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KStream-KStream"); + + final List<List<String>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Arrays.asList("A-b", "B-b"), + null, + null, + Arrays.asList("C-a", "C-b"), + Arrays.asList("A-c", "B-c", "C-c"), + null, + null, + null, + Arrays.asList("A-d", "B-d", "C-d"), + Arrays.asList("D-a", "D-b", "D-c", "D-d") + ); + + leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testInnerKStreamKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KTable"); + + final List<List<String>> expectedResult = Arrays.asList( + null, + null, + null, + null, + Collections.singletonList("B-a"), + null, + null, + null, + null, + null, + null, + null, + null, + null, + Collections.singletonList("D-d") + ); + + leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testLeftKStreamKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KTable"); + + final List<List<String>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + null, + Collections.singletonList("B-a"), + null, + null, + null, + Collections.singletonList("C-null"), + null, + null, + null, + null, + null, + Collections.singletonList("D-d") + ); + + leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testInnerKTableKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KTable-KTable"); + + final List<List<String>> expectedResult = Arrays.asList( + null, + null, + null, + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList((String) null), + null, + null, + Collections.singletonList("C-c"), + Collections.singletonList((String) null), + null, + null, + null, + Collections.singletonList("D-d") + ); + + leftTable.join(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testLeftKTableKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KTable-KTable"); + + final List<List<String>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList((String) null), + null, + Collections.singletonList("C-null"), + Collections.singletonList("C-c"), + Collections.singletonList("C-null"), + Collections.singletonList((String) null), + null, + null, + Collections.singletonList("D-d") + ); + + leftTable.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + @Test + public void testOuterKTableKTable() throws Exception { + STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KTable-KTable"); + + final List<List<String>> expectedResult = Arrays.asList( + null, + null, + Collections.singletonList("A-null"), + Collections.singletonList("A-a"), + Collections.singletonList("B-a"), + Collections.singletonList("B-b"), + Collections.singletonList("null-b"), + Collections.singletonList((String) null), + Collections.singletonList("C-null"), + Collections.singletonList("C-c"), + Collections.singletonList("C-null"), + Collections.singletonList((String) null), + null, + Collections.singletonList("null-d"), + Collections.singletonList("D-d") + ); + + leftTable.outerJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC); + + runTest(expectedResult); + } + + private final class TopicsGotDeletedCondition implements TestCondition { + @Override + public boolean conditionMet() { + final Set<String> allTopics = new HashSet<>(); + allTopics.addAll(scala.collection.JavaConversions.seqAsJavaList(zkUtils.getAllTopics())); + return !allTopics.contains(INPUT_TOPIC_1) && !allTopics.contains(INPUT_TOPIC_2) && !allTopics.contains(OUTPUT_TOPIC); + } + } + + private final class Input<V> { + String topic; + KeyValue<Long, V> record; + + private final long anyUniqueKey = 0L; + + Input(final String topic, final V value) { + this.topic = topic; + record = KeyValue.pair(anyUniqueKey, value); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index 85e2cf7..2cd3859 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -71,49 +71,49 @@ public class KTableKTableJoinIntegrationTest { public static Object[] parameters() { return new Object[][]{ {JoinType.INNER, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", null)) - }, +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", null) + )}, {JoinType.INNER, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", null) +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", null) )}, {JoinType.INNER, JoinType.OUTER, Arrays.asList( new KeyValue<>("a", "null-A3"), new KeyValue<>("b", "null-B3"), new KeyValue<>("c", "null-C3"), - new KeyValue<>("a", "null-A3"), - new KeyValue<>("b", "null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C3") +// new KeyValue<>("a", "null-A3"), +// new KeyValue<>("b", "null-B3"), + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", "null-C3") )}, {JoinType.LEFT, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", null) + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", null) )}, {JoinType.LEFT, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", null) + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", null) )}, {JoinType.LEFT, JoinType.OUTER, Arrays.asList( new KeyValue<>("a", "null-A3"), @@ -121,22 +121,22 @@ public class KTableKTableJoinIntegrationTest { new KeyValue<>("c", "null-C3"), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), - new KeyValue<>("b", "B1-B2-B3"), - new KeyValue<>("c", "null-C3") + new KeyValue<>("b", "B1-B2-B3")//, +// new KeyValue<>("c", "null-C3") )}, {JoinType.OUTER, JoinType.INNER, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), new KeyValue<>("b", "B1-B2-B3"), new KeyValue<>("c", "null-C2-C3") )}, {JoinType.OUTER, JoinType.LEFT, Arrays.asList( - new KeyValue<>("a", null), - new KeyValue<>("b", null), - new KeyValue<>("c", null), +// new KeyValue<>("a", null), +// new KeyValue<>("b", null), +// new KeyValue<>("c", null), new KeyValue<>("a", "A1-null-A3"), new KeyValue<>("b", "B1-null-B3"), new KeyValue<>("b", "B1-B2-B3"),