This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new fa57eb0 KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (#6409) fa57eb0 is described below commit fa57eb065d032c225f63a0b2ca3f050e728c2235 Author: Florian Hussonnois <fhussonn...@gmail.com> AuthorDate: Wed Mar 20 03:27:03 2019 +0100 KAFKA-6958: Add new NamedOperation interface to enforce consistency in naming operations (#6409) Sub-task required to allow to define custom processor names with KStreams DSL(KIP-307) : - add new public interface NamedOperation - deprecate methods Joined.as() and Joined.name() - update Suppredded interface to extend NamedOperation Reviewers: Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io>, Bill Bejeck <bbej...@gmail.com> --- .../org/apache/kafka/streams/kstream/Joined.java | 40 ++++++++++++++++--- .../kafka/streams/kstream/NamedOperation.java | 32 +++++++++++++++ .../apache/kafka/streams/kstream/Suppressed.java | 3 +- .../streams/kstream/internals/JoinedInternal.java | 45 ++++++++++++++++++++++ .../streams/kstream/internals/KStreamImpl.java | 15 ++++++-- 5 files changed, 124 insertions(+), 11 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java index aa29c68..1343487 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Joined.java @@ -22,13 +22,12 @@ import org.apache.kafka.common.serialization.Serde; * The {@code Joined} class represents optional params that can be passed to * {@link KStream#join}, {@link KStream#leftJoin}, and {@link KStream#outerJoin} operations. */ -public class Joined<K, V, VO> { - - private final Serde<K> keySerde; - private final Serde<V> valueSerde; - private final Serde<VO> otherValueSerde; - private final String name; +public class Joined<K, V, VO> implements NamedOperation<Joined<K, V, VO>> { + protected final Serde<K> keySerde; + protected final Serde<V> valueSerde; + protected final Serde<VO> otherValueSerde; + protected final String name; private Joined(final Serde<K> keySerde, final Serde<V> valueSerde, @@ -40,6 +39,10 @@ public class Joined<K, V, VO> { this.name = name; } + protected Joined(final Joined<K, V, VO> joined) { + this(joined.keySerde, joined.valueSerde, joined.otherValueSerde, joined.name); + } + /** * Create an instance of {@code Joined} with key, value, and otherValue {@link Serde} instances. * {@code null} values are accepted and will be replaced by the default serdes as defined in config. @@ -135,11 +138,30 @@ public class Joined<K, V, VO> { * @param <V> value type * @param <VO> other value type * @return new {@code Joined} instance configured with the name + * + * @deprecated use {@link #as(String)} instead */ + @Deprecated public static <K, V, VO> Joined<K, V, VO> named(final String name) { return new Joined<>(null, null, null, name); } + /** + * Create an instance of {@code Joined} with base name for all components of the join, this may + * include any repartition topics created to complete the join. + * + * @param name the name used as the base for naming components of the join including any + * repartition topics + * @param <K> key type + * @param <V> value type + * @param <VO> other value type + * @return new {@code Joined} instance configured with the name + * + */ + public static <K, V, VO> Joined<K, V, VO> as(final String name) { + return new Joined<>(null, null, null, name); + } + /** * Set the key {@link Serde} to be used. Null values are accepted and will be replaced by the default @@ -182,6 +204,7 @@ public class Joined<K, V, VO> { * repartition topics * @return new {@code Joined} instance configured with the {@code name} */ + @Override public Joined<K, V, VO> withName(final String name) { return new Joined<>(keySerde, valueSerde, otherValueSerde, name); } @@ -198,7 +221,12 @@ public class Joined<K, V, VO> { return otherValueSerde; } + /** + * @deprecated this method will be removed in a in a future release + */ + @Deprecated public String name() { return name; } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java b/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java new file mode 100644 index 0000000..9a2c40b --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/NamedOperation.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream; + +/** + * Default interface which can be used to personalized the named of operations, internal topics or store. + */ +interface NamedOperation<T extends NamedOperation<T>> { + + /** + * Sets the name to be used for an operation. + * + * @param name the name to use. + * @return an instance of {@link NamedOperation} + */ + T withName(final String name); + +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java index 6854101..b5d7937 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Suppressed.java @@ -23,7 +23,7 @@ import org.apache.kafka.streams.kstream.internals.suppress.SuppressedInternal; import java.time.Duration; -public interface Suppressed<K> { +public interface Suppressed<K> extends NamedOperation<Suppressed<K>> { /** * Marker interface for a buffer configuration that is "strict" in the sense that it will strictly @@ -163,5 +163,6 @@ public interface Suppressed<K> { * @param name The name to be used for the suppression node and changelog topic * @return The same configuration with the addition of the given {@code name}. */ + @Override Suppressed<K> withName(final String name); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java new file mode 100644 index 0000000..99f7a0f --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/JoinedInternal.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.Joined; + +public class JoinedInternal<K, V, VO> extends Joined<K, V, VO> { + + JoinedInternal(final Joined<K, V, VO> joined) { + super(joined); + } + + public Serde<K> keySerde() { + return keySerde; + } + + public Serde<V> valueSerde() { + return valueSerde; + } + + public Serde<VO> otherValueSerde() { + return otherValueSerde; + } + + @Override // TODO remove annotation when super.name() is removed + @SuppressWarnings("deprecation") // this method should not be removed if super.name() is removed + public String name() { + return name; + } +} diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 856536c..41260c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -567,13 +567,15 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K KStreamImpl<K, V> joinThis = this; KStreamImpl<K, VO> joinOther = (KStreamImpl<K, VO>) other; + final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined); + final String name = joinedInternal.name(); if (joinThis.repartitionRequired) { - final String leftJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-left" : joinThis.name; + final String leftJoinRepartitionTopicName = name != null ? name + "-left" : joinThis.name; joinThis = joinThis.repartitionForJoin(leftJoinRepartitionTopicName, joined.keySerde(), joined.valueSerde()); } if (joinOther.repartitionRequired) { - final String rightJoinRepartitionTopicName = joined.name() != null ? joined.name() + "-right" : joinOther.name; + final String rightJoinRepartitionTopicName = name != null ? name + "-right" : joinOther.name; joinOther = joinOther.repartitionForJoin(rightJoinRepartitionTopicName, joined.keySerde(), joined.otherValueSerde()); } @@ -679,9 +681,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joined, "joined can't be null"); + + final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined); + final String name = joinedInternal.name(); if (repartitionRequired) { final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin( - joined.name() != null ? joined.name() : name, + name != null ? name : this.name, joined.keySerde(), joined.valueSerde() ); @@ -703,9 +708,11 @@ public class KStreamImpl<K, V> extends AbstractStream<K, V> implements KStream<K Objects.requireNonNull(other, "other can't be null"); Objects.requireNonNull(joiner, "joiner can't be null"); Objects.requireNonNull(joined, "joined can't be null"); + final JoinedInternal<K, V, VO> joinedInternal = new JoinedInternal<>(joined); + final String internalName = joinedInternal.name(); if (repartitionRequired) { final KStreamImpl<K, V> thisStreamRepartitioned = repartitionForJoin( - joined.name() != null ? joined.name() : name, + internalName != null ? internalName : name, joined.keySerde(), joined.valueSerde() );