This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 85000e1d33ba5bfba4088d4241be6c2da42625cf Author: Geordie <g1geor...@gmail.com> AuthorDate: Fri Jun 25 00:07:22 2021 +0800 KAFKA-12336 Custom stream naming does not work while calling stream[K… (#10190) Custom stream naming does not work while calling stream[K, V](topicPattern: Pattern) Reviewers: Bill Bejeck <bbej...@apache.org> --- .../kstream/internals/InternalStreamsBuilder.java | 2 +- .../kafka/streams/scala/kstream/KStreamTest.scala | 20 ++++++++++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index cd59427..01ebfbd 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -102,7 +102,7 @@ public class InternalStreamsBuilder implements InternalNameProvider { public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) { - final String name = newProcessorName(KStreamImpl.SOURCE_NAME); + final String name = new NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, KStreamImpl.SOURCE_NAME); final StreamSourceNode<K, V> streamPatternSourceNode = new StreamSourceNode<>(name, topicPattern, consumed); addGraphNode(root, streamPatternSourceNode); diff --git a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala index a7f7f58..d7e7e9a 100644 --- a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala +++ b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala @@ -18,6 +18,7 @@ package org.apache.kafka.streams.scala.kstream import java.time.Duration.ofSeconds import java.time.Instant +import java.util.regex.Pattern import org.apache.kafka.streams.KeyValue import org.apache.kafka.streams.kstream.{ @@ -449,4 +450,23 @@ class KStreamTest extends FlatSpec with Matchers with TestDriver { val transformNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.toList(1) transformNode.name() shouldBe "my-name" } + + @Test + def testSettingNameOnStream(): Unit = { + val builder = new StreamsBuilder() + val topicsPattern = "t-[A-Za-z0-9-].suffix" + val sinkTopic = "sink" + + builder + .stream[String, String](Pattern.compile(topicsPattern))( + Consumed.`with`[String, String].withName("my-fancy-name") + ) + .to(sinkTopic) + + import scala.jdk.CollectionConverters._ + + val streamNode = builder.build().describe().subtopologies().asScala.head.nodes().asScala.head + assertEquals("my-fancy-name", streamNode.name()) + } + }