Repository: kafka Updated Branches: refs/heads/trunk e1491d4a0 -> 08063f50a
KAFKA:5653: add join overloads to KTable Add `join`, `leftJoin`, `outerJoin` overloads that use `Materialized` to `KTable` Author: Damian Guy <[email protected]> Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax <[email protected]>, Guozhang Wang <[email protected]> Closes #3826 from dguy/kafka-5653 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/08063f50 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/08063f50 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/08063f50 Branch: refs/heads/trunk Commit: 08063f50a04fda3e40c6060a432a97f49bb68c8c Parents: e1491d4 Author: Damian Guy <[email protected]> Authored: Tue Sep 12 16:01:19 2017 +0100 Committer: Damian Guy <[email protected]> Committed: Tue Sep 12 16:01:19 2017 +0100 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KTable.java | 290 +++++++++++++++++-- .../streams/kstream/internals/KTableImpl.java | 88 +++++- .../KTableKTableJoinIntegrationTest.java | 33 ++- .../kstream/internals/KTableImplTest.java | 24 +- 4 files changed, 400 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java index 2571ac1..6d1d85d 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java @@ -84,7 +84,7 @@ public interface KTable<K, V> { * have delete semantics. * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded * directly if required (i.e., if there is anything to be deleted). - * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record * is forwarded. * * @param predicate a filter {@link Predicate} that is applied to each record @@ -106,7 +106,7 @@ public interface KTable<K, V> { * have delete semantics. * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded * directly if required (i.e., if there is anything to be deleted). - * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record * is forwarded. * <p> * To query the local {@link KeyValueStore} it must be obtained via @@ -124,7 +124,7 @@ public interface KTable<K, V> { * * @param predicate a filter {@link Predicate} that is applied to each record * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} - * should be materialized + * should be materialized. Cannot be {@code null} * @return a {@code KTable} that contains only those records that satisfy the given predicate * @see #filterNot(Predicate, Materialized) */ @@ -144,7 +144,7 @@ public interface KTable<K, V> { * have delete semantics. * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded * directly if required (i.e., if there is anything to be deleted). - * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record * is forwarded. * <p> * To query the local {@link KeyValueStore} it must be obtained via @@ -184,7 +184,7 @@ public interface KTable<K, V> { * have delete semantics. * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded * directly if required (i.e., if there is anything to be deleted). - * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record + * Furthermore, for each record that gets dropped (i.e., dot not satisfy the given predicate) a tombstone record * is forwarded. * <p> * To query the local {@link KeyValueStore} it must be obtained via @@ -260,7 +260,7 @@ public interface KTable<K, V> { * <p> * @param predicate a filter {@link Predicate} that is applied to each record * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} - * should be materialized + * should be materialized. Cannot be {@code null} * @return a {@code KTable} that contains only those records that do <em>not</em> satisfy the given predicate * @see #filter(Predicate, Materialized) */ @@ -412,7 +412,7 @@ public interface KTable<K, V> { * * @param mapper a {@link ValueMapper} that computes a new output value * @param materialized a {@link Materialized} that describes how the {@link StateStore} for the resulting {@code KTable} - * should be materialized + * should be materialized. Cannot be {@code null} * @param <VR> the value type of the result {@code KTable} * * @return a {@code KTable} that contains records with unmodified keys and new values (possibly of different type) @@ -1342,6 +1342,82 @@ public interface KTable<K, V> { * Both input streams (or to be more precise, their underlying source topics) need to have the same number of * partitions. * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + */ + <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); + /** + * Join records of this {@code KTable} with another {@code KTable}'s records using non-windowed inner equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable} the provided + * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded + * directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> + * </tr> + * <tr> + * <td><K1:C></td> + * <td><K1:C></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(C,b)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:C></td> + * <td><K1:null></td> + * <td></td> + * <td><K1:null></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param <VO> the value type of the other {@code KTable} @@ -1353,9 +1429,11 @@ public interface KTable<K, V> { * (i.e., that would be equivalent to calling {@link KTable#join(KTable, ValueJoiner)}. * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)} */ + @Deprecated <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<VR> joinSerde, @@ -1430,9 +1508,11 @@ public interface KTable<K, V> { * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key - * @see #leftJoin(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #join(KTable, ValueJoiner, Materialized)} */ + @Deprecated <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -1589,6 +1669,90 @@ public interface KTable<K, V> { * Both input streams (or to be more precise, their underlying source topics) need to have the same number of * partitions. * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * left {@code KTable} + * @see #join(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + */ + <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed left equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@code KTable} will produce + * an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record of left {@code KTable} that does not find a corresponding record in the + * right {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue = + * null} to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is + * forwarded directly to delete a record in the result {@code KTable} if required (i.e., if there is anything to be + * deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> + * </tr> + * <tr> + * <td><K1:null></td> + * <td></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:null></td> + * </tr> + * <tr> + * <td></td> + * <td></td> + * <td><K1:null></td> + * <td></td> + * <td></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param <VO> the value type of the other {@code KTable} @@ -1601,9 +1765,11 @@ public interface KTable<K, V> { * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * left {@code KTable} - * @see #join(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #join(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)} */ + @Deprecated <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<VR> joinSerde, @@ -1686,9 +1852,11 @@ public interface KTable<K, V> { * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * left {@code KTable} - * @see #join(KTable, ValueJoiner) - * @see #outerJoin(KTable, ValueJoiner) + * @see #join(KTable, ValueJoiner, Materialized) + * @see #outerJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #leftJoin(KTable, ValueJoiner, Materialized)} */ + @Deprecated <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final StateStoreSupplier<KeyValueStore> storeSupplier); @@ -1843,6 +2011,90 @@ public interface KTable<K, V> { * Both input streams (or to be more precise, their underlying source topics) need to have the same number of * partitions. * + * @param other the other {@code KTable} to be joined with this {@code KTable} + * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records + * @param materialized an instance of {@link Materialized} used to describe how the state store should be materialized. + * Cannot be {@code null} + * @param <VO> the value type of the other {@code KTable} + * @param <VR> the value type of the result {@code KTable} + * @return a {@code KTable} that contains join-records for each key and values computed by the given + * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of + * both {@code KTable}s + * @see #join(KTable, ValueJoiner) + * @see #leftJoin(KTable, ValueJoiner) + */ + <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized); + + /** + * Join records of this {@code KTable} (left input) with another {@code KTable}'s (right input) records using + * non-windowed outer equi join. + * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}. + * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join}, + * all records from both input {@code KTable}s will produce an output record (cf. below). + * The result is an ever updating {@code KTable} that represents the <em>current</em> (i.e., processing time) result + * of the join. + * <p> + * The join is computed by (1) updating the internal state of one {@code KTable} and (2) performing a lookup for a + * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@code KTable}. + * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input + * {@code KTable} the result gets updated. + * <p> + * For each {@code KTable} record that finds a corresponding record in the other {@code KTable}'s state the + * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record. + * Additionally, for each record that does not find a corresponding record in the corresponding other + * {@code KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the + * corresponding other value to compute a value (with arbitrary type) for the result record. + * The key of the result record is the same as for both joining input records. + * <p> + * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics. + * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly + * to delete a record in the result {@code KTable} if required (i.e., if there is anything to be deleted). + * <p> + * Input records with {@code null} key will be dropped and no join computation is performed. + * <p> + * Example: + * <table border='1'> + * <tr> + * <th>thisKTable</th> + * <th>thisState</th> + * <th>otherKTable</th> + * <th>otherState</th> + * <th>result update record</th> + * </tr> + * <tr> + * <td><K1:A></td> + * <td><K1:A></td> + * <td></td> + * <td></td> + * <td><K1:ValueJoiner(A,null)></td> + * </tr> + * <tr> + * <td></td> + * <td><K1:A></td> + * <td><K1:b></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(A,b)></td> + * </tr> + * <tr> + * <td><K1:null></td> + * <td></td> + * <td></td> + * <td><K1:b></td> + * <td><K1:ValueJoiner(null,b)></td> + * </tr> + * <tr> + * <td></td> + * <td></td> + * <td><K1:null></td> + * <td></td> + * <td><K1:null></td> + * </tr> + * </table> + * Both input streams (or to be more precise, their underlying source topics) need to have the same number of + * partitions. + * * @param other the other {@code KTable} to be joined with this {@code KTable} * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records * @param <VO> the value type of the other {@code KTable} @@ -1855,9 +2107,11 @@ public interface KTable<K, V> { * @return a {@code KTable} that contains join-records for each key and values computed by the given * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of * both {@code KTable}s - * @see #join(KTable, ValueJoiner) - * @see #leftJoin(KTable, ValueJoiner) + * @see #join(KTable, ValueJoiner, Materialized) + * @see #leftJoin(KTable, ValueJoiner, Materialized) + * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)} */ + @Deprecated <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final Serde<VR> joinSerde, @@ -1941,7 +2195,9 @@ public interface KTable<K, V> { * both {@code KTable}s * @see #join(KTable, ValueJoiner) * @see #leftJoin(KTable, ValueJoiner) + * @deprecated use {@link #outerJoin(KTable, ValueJoiner, Materialized)} */ + @Deprecated <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, final StateStoreSupplier<KeyValueStore> storeSupplier); http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index ed7abdc..067bcfc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -534,7 +534,17 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public <V1, R> KTable<K, R> join(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { - return doJoin(other, joiner, false, false, null, null); + return doJoin(other, joiner, null, false, false); + } + + @Override + public <VO, VR> KTable<K, VR> join(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { + Objects.requireNonNull(other, "other can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + return doJoin(other, joiner, new MaterializedInternal<>(materialized), false, false); } @Override @@ -556,7 +566,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public <V1, R> KTable<K, R> outerJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { - return doJoin(other, joiner, true, true, null, null); + return doJoin(other, joiner, null, true, true); + } + + @Override + public <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { + return doJoin(other, joiner, new MaterializedInternal<>(materialized), true, true); } @Override @@ -578,7 +595,18 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, @Override public <V1, R> KTable<K, R> leftJoin(final KTable<K, V1> other, final ValueJoiner<? super V, ? super V1, ? extends R> joiner) { - return doJoin(other, joiner, true, false, null, null); + return doJoin(other, joiner, null, true, false); + } + + @Override + public <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) { + return doJoin(other, + joiner, + new MaterializedInternal<>(materialized), + true, + false); } @Override @@ -619,8 +647,53 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final StateStoreSupplier<KeyValueStore> storeSupplier) { Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); + final String joinMergeName = builder.newName(MERGE_NAME); final String internalQueryableName = storeSupplier == null ? null : storeSupplier.name(); - final Set<String> allSourceNodes = ensureJoinableWith((AbstractStream<K>) other); + final KTable<K, R> result = buildJoin((AbstractStream<K>) other, + joiner, + leftOuter, + rightOuter, + joinMergeName, + internalQueryableName); + + if (internalQueryableName != null) { + builder.internalTopologyBuilder.addStateStore(storeSupplier, joinMergeName); + } + + return result; + } + + private <VO, VR> KTable<K, VR> doJoin(final KTable<K, VO> other, + final ValueJoiner<? super V, ? super VO, ? extends VR> joiner, + final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materialized, + final boolean leftOuter, + final boolean rightOuter) { + Objects.requireNonNull(other, "other can't be null"); + Objects.requireNonNull(joiner, "joiner can't be null"); + final String internalQueryableName = materialized == null ? null : materialized.storeName(); + final String joinMergeName = builder.newName(MERGE_NAME); + final KTable<K, VR> result = buildJoin((AbstractStream<K>) other, + joiner, + leftOuter, + rightOuter, + joinMergeName, + internalQueryableName); + + if (materialized != null) { + final StoreBuilder<KeyValueStore<K, VR>> storeBuilder + = new KeyValueStoreMaterializer<>(materialized).materialize(); + builder.internalTopologyBuilder.addStateStore(storeBuilder, joinMergeName); + } + return result; + } + + private <V1, R> KTable<K, R> buildJoin(final AbstractStream<K> other, + final ValueJoiner<? super V, ? super V1, ? extends R> joiner, + final boolean leftOuter, + final boolean rightOuter, + final String joinMergeName, + final String internalQueryableName) { + final Set<String> allSourceNodes = ensureJoinableWith(other); if (leftOuter) { enableSendingOldValues(); @@ -631,7 +704,7 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, final String joinThisName = builder.newName(JOINTHIS_NAME); final String joinOtherName = builder.newName(JOINOTHER_NAME); - final String joinMergeName = builder.newName(MERGE_NAME); + final KTableKTableAbstractJoin<K, R, V, V1> joinThis; final KTableKTableAbstractJoin<K, R, V1, V> joinOther; @@ -659,11 +732,6 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K, builder.internalTopologyBuilder.addProcessor(joinMergeName, joinMerge, joinThisName, joinOtherName); builder.internalTopologyBuilder.connectProcessorAndStateStores(joinThisName, ((KTableImpl) other).valueGetterSupplier().storeNames()); builder.internalTopologyBuilder.connectProcessorAndStateStores(joinOtherName, valueGetterSupplier().storeNames()); - - if (internalQueryableName != null) { - builder.internalTopologyBuilder.addStateStore(storeSupplier, joinMergeName); - } - return new KTableImpl<>(builder, joinMergeName, joinMerge, allSourceNodes, internalQueryableName, internalQueryableName != null); } http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java index 949f8be..1b45711 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -29,8 +30,10 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.QueryableStoreTypes; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.apache.kafka.test.IntegrationTest; @@ -344,8 +347,15 @@ public class KTableKTableJoinIntegrationTest { final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2); final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3); + Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = null; + if (queryableName != null) { + materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableName) + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String()) + .withCachingDisabled(); + } join(join(table1, table2, joinType1, null /* no need to query intermediate result */), table3, - joinType2, queryableName).to(OUTPUT); + joinType2, materialized).to(OUTPUT); return new KafkaStreams(builder.build(), new StreamsConfig(streamsConfig)); } @@ -353,7 +363,7 @@ public class KTableKTableJoinIntegrationTest { private KTable<String, String> join(final KTable<String, String> first, final KTable<String, String> second, final JoinType joinType, - final String queryableName) { + final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) { final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() { @Override public String apply(final String value1, final String value2) { @@ -361,13 +371,26 @@ public class KTableKTableJoinIntegrationTest { } }; + switch (joinType) { case INNER: - return first.join(second, joiner, Serdes.String(), queryableName); + if (materialized != null) { + return first.join(second, joiner, materialized); + } else { + return first.join(second, joiner); + } case LEFT: - return first.leftJoin(second, joiner, Serdes.String(), queryableName); + if (materialized != null) { + return first.leftJoin(second, joiner, materialized); + } else { + return first.leftJoin(second, joiner); + } case OUTER: - return first.outerJoin(second, joiner, Serdes.String(), queryableName); + if (materialized != null) { + return first.outerJoin(second, joiner, materialized); + } else { + return first.outerJoin(second, joiner); + } } throw new RuntimeException("Unknown join type."); http://git-wip-us.apache.org/repos/asf/kafka/blob/08063f50/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 64ae6de..6ca38b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.internals.SinkNode; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.streams.state.KeyValueStore; @@ -437,19 +438,21 @@ public class KTableImplTest { table.join(table, MockValueJoiner.TOSTRING_JOINER, null, null); } + @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void shouldNotAllowNullStoreSupplierInJoin() { - table.join(table, MockValueJoiner.TOSTRING_JOINER, null); + table.join(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null); } + @SuppressWarnings("unchecked") @Test(expected = NullPointerException.class) public void shouldNotAllowNullStoreSupplierInLeftJoin() { - table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, null); + table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null); } @Test(expected = NullPointerException.class) public void shouldNotAllowNullStoreSupplierInOuterJoin() { - table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, null); + table.outerJoin(table, MockValueJoiner.TOSTRING_JOINER, (StateStoreSupplier) null); } @Test(expected = NullPointerException.class) @@ -496,4 +499,19 @@ public class KTableImplTest { } }, (Materialized<String, String, KeyValueStore<Bytes, byte[]>>) null); } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnJoinWhenMaterializedIsNull() { + table.join(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnLeftJoinWhenMaterializedIsNull() { + table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null); + } + + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnOuterJoinWhenMaterializedIsNull() { + table.leftJoin(table, MockValueJoiner.TOSTRING_JOINER, (Materialized) null); + } }
