This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9077d83 MINOR: Add select changes from 3rd KIP-307 PR for incrementing name index counter (#6754) 9077d83 is described below commit 9077d83672a4d08273ce4a6012f1787f5313f948 Author: Bill Bejeck <bbej...@gmail.com> AuthorDate: Fri May 17 18:10:11 2019 -0400 MINOR: Add select changes from 3rd KIP-307 PR for incrementing name index counter (#6754) When users provide a name for operation via the Streams DSL, we need to increment the counter used for auto-generated names to make sure any operators downstream of a named operator still produce a compatible name. This PR is a subset of #6411 by @fhussonnois. We need to merge this PR now because it covers cases when users name repartition topics or state stores. Updated tests to reflect the counter produces expected number even when the user provides a name. Matthias J. Sax <mj...@apache.org>, John Roesler <j...@confluent.io> --- .../org/apache/kafka/streams/kstream/Named.java | 6 ++- .../kstream/internals/InternalStreamsBuilder.java | 2 +- .../streams/kstream/internals/NamedInternal.java | 48 +++++++++++----------- .../apache/kafka/streams/StreamsBuilderTest.java | 8 ++-- .../kstream/internals/NamedInternalTest.java | 48 +++++++++++++--------- 5 files changed, 62 insertions(+), 50 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java index 1db031a..84bb819 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Named.java @@ -26,6 +26,10 @@ public class Named implements NamedOperation<Named> { protected String name; + protected Named(final Named named) { + this(Objects.requireNonNull(named, "named can't be null").name); + } + protected Named(final String name) { this.name = name; if (name != null) { @@ -51,7 +55,7 @@ public class Named implements NamedOperation<Named> { return new Named(name); } - static void validate(final String name) { + protected static void validate(final String name) { if (name.isEmpty()) throw new TopologyException("Name is illegal, it can't be empty"); if (name.equals(".") || name.equals("..")) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index e7a7678..3a90fd2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -118,7 +118,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { final String sourceName = new NamedInternal(consumed.name()) .orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); final String tableSourceName = new NamedInternal(consumed.name()) - .suffixWithOrElseGet("-table-source", () -> newProcessorName(KTableImpl.SOURCE_NAME)); + .suffixWithOrElseGet("-table-source", this, KTableImpl.SOURCE_NAME); final KTableSource<K, V> tableSource = new KTableSource<>(materialized.storeName(), materialized.queryableStoreName()); final ProcessorParameters<K, V> processorParameters = new ProcessorParameters<>(tableSource, tableSourceName); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java index e83728e..d478e9b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/NamedInternal.java @@ -17,8 +17,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.streams.kstream.Named; -import java.util.Optional; -import java.util.function.Supplier; public class NamedInternal extends Named { @@ -33,14 +31,14 @@ public class NamedInternal extends Named { /** * Creates a new {@link NamedInternal} instance. * - * @param internal the internal name. + * @param internal the internal name. */ NamedInternal(final String internal) { super(internal); } /** - * @return a string name. + * @return a string name. */ public String name() { return name; @@ -51,31 +49,31 @@ public class NamedInternal extends Named { return new NamedInternal(name); } - /** - * Check whether an internal name is defined. - * @return {@code false} if no name is set. - */ - public boolean isDefined() { - return name != null; - } + String suffixWithOrElseGet(final String suffix, final InternalNameProvider provider, final String prefix) { + // We actually do not need to generate processor names for operation if a name is specified. + // But before returning, we still need to burn index for the operation to keep topology backward compatibility. + if (name != null) { + provider.newProcessorName(prefix); + + final String suffixed = name + suffix; + // Re-validate generated name as suffixed string could be too large. + Named.validate(suffixed); - String suffixWithOrElseGet(final String suffix, final Supplier<String> supplier) { - final Optional<String> suffixed = Optional.ofNullable(this.name).map(s -> s + suffix); - // Creating a new named will re-validate generated name as suffixed string could be too large. - return new NamedInternal(suffixed.orElseGet(supplier)).name(); + return suffixed; + } else { + return provider.newProcessorName(prefix); + } } String orElseGenerateWithPrefix(final InternalNameProvider provider, final String prefix) { - return orElseGet(() -> provider.newProcessorName(prefix)); + // We actually do not need to generate processor names for operation if a name is specified. + // But before returning, we still need to burn index for the operation to keep topology backward compatibility. + if (name != null) { + provider.newProcessorName(prefix); + return name; + } else { + return provider.newProcessorName(prefix); + } } - /** - * Returns the internal name or the value returns from the supplier. - * - * @param supplier the supplier to be used if internal name is empty. - * @return an internal string name. - */ - private String orElseGet(final Supplier<String> supplier) { - return Optional.ofNullable(this.name).orElseGet(supplier); - } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index 669cece..93d444b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -424,7 +424,7 @@ public class StreamsBuilderTest { builder.stream(STREAM_TOPIC_TWO); builder.build(); final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); - assertSpecifiedNameForOperation(topology, expected, "KSTREAM-SOURCE-0000000000"); + assertSpecifiedNameForOperation(topology, expected, "KSTREAM-SOURCE-0000000001"); } @Test @@ -440,8 +440,8 @@ public class StreamsBuilderTest { topology, expected, expected + "-table-source", - "KSTREAM-SOURCE-0000000002", - "KTABLE-SOURCE-0000000003"); + "KSTREAM-SOURCE-0000000004", + "KTABLE-SOURCE-0000000005"); } @Test @@ -467,7 +467,7 @@ public class StreamsBuilderTest { stream.to(STREAM_TOPIC_TWO); builder.build(); final ProcessorTopology topology = builder.internalTopologyBuilder.rewriteTopology(new StreamsConfig(props)).build(); - assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000001"); + assertSpecifiedNameForOperation(topology, "KSTREAM-SOURCE-0000000000", expected, "KSTREAM-SINK-0000000002"); } private void assertSpecifiedNameForOperation(final ProcessorTopology topology, final String... expected) { diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java index 98b3a4d..1c4c700 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/NamedInternalTest.java @@ -22,45 +22,55 @@ import static org.junit.Assert.assertEquals; public class NamedInternalTest { - private static final String TEST_VALUE = "default-value"; + private static final String TEST_PREFIX = "prefix-"; + private static final String TEST_VALUE = "default-value"; private static final String TEST_SUFFIX = "-suffix"; + private static class TestNameProvider implements InternalNameProvider { + int index = 0; + + @Override + public String newProcessorName(final String prefix) { + return prefix + "PROCESSOR-" + index++; + } + + @Override + public String newStoreName(final String prefix) { + return prefix + "STORE-" + index++; + } + + } + @Test public void shouldSuffixNameOrReturnProviderValue() { final String name = "foo"; + final TestNameProvider provider = new TestNameProvider(); + assertEquals( - name + TEST_SUFFIX, - NamedInternal.with(name).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE) + name + TEST_SUFFIX, + NamedInternal.with(name).suffixWithOrElseGet(TEST_SUFFIX, provider, TEST_PREFIX) ); + + // 1, not 0, indicates that the named call still burned an index number. assertEquals( - TEST_VALUE, - NamedInternal.with(null).suffixWithOrElseGet(TEST_SUFFIX, () -> TEST_VALUE) + "prefix-PROCESSOR-1", + NamedInternal.with(null).suffixWithOrElseGet(TEST_SUFFIX, provider, TEST_PREFIX) ); } @Test public void shouldGenerateWithPrefixGivenEmptyName() { final String prefix = "KSTREAM-MAP-"; - assertEquals(prefix + "PROCESSOR-NAME", NamedInternal.with(null).orElseGenerateWithPrefix( - new InternalNameProvider() { - @Override - public String newProcessorName(final String prefix) { - return prefix + "PROCESSOR-NAME"; - } - - @Override - public String newStoreName(final String prefix) { - return null; - } - }, - prefix) + assertEquals(prefix + "PROCESSOR-0", NamedInternal.with(null).orElseGenerateWithPrefix( + new TestNameProvider(), + prefix) ); } @Test public void shouldNotGenerateWithPrefixGivenValidName() { final String validName = "validName"; - assertEquals(validName, NamedInternal.with(validName).orElseGenerateWithPrefix(null, "KSTREAM-MAP-") + assertEquals(validName, NamedInternal.with(validName).orElseGenerateWithPrefix(new TestNameProvider(), "KSTREAM-MAP-") ); } } \ No newline at end of file