Repository: kafka Updated Branches: refs/heads/trunk c840f2a95 -> 416817920
HOTFIX: Fixes to javadoc and to state store name for link joins Author: Eno Thereska <[email protected]> Reviewers: Damian Guy, Guozhang Wang Closes #1674 from enothereska/hotfix-misc-joins Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/41681792 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/41681792 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/41681792 Branch: refs/heads/trunk Commit: 416817920bcc0a5f74ddf5231505160a68c7d2db Parents: c840f2a Author: Eno Thereska <[email protected]> Authored: Tue Aug 2 14:41:18 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Tue Aug 2 14:41:18 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/streams/kstream/KStream.java | 28 +++++++++++--------- .../kstream/internals/AbstractStream.java | 4 --- .../streams/kstream/internals/KStreamImpl.java | 8 +++--- 3 files changed, 21 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java index 060a1ee..4b0c185 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -373,8 +373,8 @@ public interface KStream<K, V> { /** * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join. * If a record key is null it will not included in the resulting {@link KStream} - * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names. - * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created + * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names. + * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the * {@link org.apache.kafka.streams.StreamsConfig}. * @@ -404,8 +404,8 @@ public interface KStream<K, V> { /** * Combine element values of this stream with another {@link KStream}'s elements of the same key using windowed Inner Join * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream} - * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names. - * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created + * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated store names. + * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the * {@link org.apache.kafka.streams.StreamsConfig}. * @@ -425,8 +425,9 @@ public interface KStream<K, V> { /** * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join. * If a record key is null it will not included in the resulting {@link KStream} - * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names. - * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created + * Both of the joining {@link KStream}s will be materialized in local state stores with an auto-generated + * store name. + * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the * {@link org.apache.kafka.streams.StreamsConfig}. * @@ -456,8 +457,9 @@ public interface KStream<K, V> { /** * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Outer Join * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream} - * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names. - * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created + * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated + * store names. + * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the * {@link org.apache.kafka.streams.StreamsConfig}. * @@ -478,8 +480,9 @@ public interface KStream<K, V> { /** * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join. * If a record key is null it will not included in the resulting {@link KStream} - * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names. - * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created + * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated + * store names. + * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the * {@link org.apache.kafka.streams.StreamsConfig}. * @@ -509,8 +512,9 @@ public interface KStream<K, V> { /** * Combine values of this stream with another {@link KStream}'s elements of the same key using windowed Left Join * with default serializers and deserializers. If a record key is null it will not included in the resulting {@link KStream} - * Both of the joining {@link KStream}s will be materialized in local state stores with the given store names. - * Also a changelog topic named "${applicationId}-${storeName}-changelog" will be automatically created + * Both of the joining {@link KStream}s will be materialized in local state stores with auto-generated + * store names. + * Also a changelog topic named "${applicationId}-store name-changelog" will be automatically created * in Kafka for each store for failure recovery, where "applicationID" is user-specified in the * {@link org.apache.kafka.streams.StreamsConfig}. * http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java index b764a6e..2f5b160 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; -import org.apache.kafka.streams.errors.TopologyBuilderException; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.ValueJoiner; @@ -46,9 +45,6 @@ public abstract class AbstractStream<K> { this.sourceNodes = sourceNodes; } - /** - * @throws TopologyBuilderException if the streams are not joinable - */ protected Set<String> ensureJoinableWith(AbstractStream<K> other) { Set<String> allSourceNodes = new HashSet<>(); allSourceNodes.addAll(sourceNodes); http://git-wip-us.apache.org/repos/asf/kafka/blob/41681792/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7ecbf66..1859503 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -697,16 +697,18 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V Serde<K1> keySerde, Serde<V1> lhsValueSerde, Serde<V2> otherValueSerde) { + String otherWindowStreamName = topology.newName(WINDOWED_NAME); + String joinThisName = topology.newName(LEFTJOIN_NAME); + StateStoreSupplier otherWindow = - createWindowedStateStore(windows, keySerde, otherValueSerde, name + "other"); + createWindowedStateStore(windows, keySerde, otherValueSerde, joinThisName + "-store"); KStreamJoinWindow<K1, V1> otherWindowedStream = new KStreamJoinWindow<>(otherWindow.name(), windows.before + windows.after + 1, windows.maintainMs()); KStreamKStreamJoin<K1, R, V1, V2> joinThis = new KStreamKStreamJoin<>(otherWindow.name(), windows.before, windows.after, joiner, true); - String otherWindowStreamName = topology.newName(WINDOWED_NAME); - String joinThisName = topology.newName(LEFTJOIN_NAME); + topology.addProcessor(otherWindowStreamName, otherWindowedStream, ((AbstractStream) other).name); topology.addProcessor(joinThisName, joinThis, ((AbstractStream) lhs).name);
