[ https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
GeordieMai reassigned KAFKA-12336: ---------------------------------- Assignee: GeordieMai > custom stream naming does not work while calling stream[K, V](topicPattern: > Pattern) API with named Consumed parameter > ----------------------------------------------------------------------------------------------------------------------- > > Key: KAFKA-12336 > URL: https://issues.apache.org/jira/browse/KAFKA-12336 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.7.0 > Reporter: Ramil Israfilov > Assignee: GeordieMai > Priority: Minor > Labels: easy-fix, newbie > > In our Scala application I am trying to implement custom naming for Kafka > Streams application nodes. > We are using topicPattern for our stream source. > Here is an API which I am calling: > > {code:java} > val topicsPattern="t-[A-Za-z0-9-].suffix" > val operations: KStream[MyKey, MyValue] = > builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))( > Consumed.`with`[MyKey, MyValue].withName("my-fancy-name") > ) > {code} > Despite the fact that I am providing Consumed with custom name the topology > describe still show "KSTREAM-SOURCE-0000000000" as name for our stream source. > It is not a problem if I just use a name for topic. But our application needs > to get messages from set of topics based on topicname pattern matching. > After checking the kakfa code I see that > org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line > 103) has a bug: > {code:java} > public <K, V> KStream<K, V> stream(final Pattern topicPattern, > final ConsumedInternal<K, V> consumed) { > final String name = newProcessorName(KStreamImpl.SOURCE_NAME); > final StreamSourceNode<K, V> streamPatternSourceNode = new > StreamSourceNode<>(name, topicPattern, consumed); > {code} > node name construction does not take into account the name of consumed > parameter. > For example code for another stream api call with topic name does it > correctly: > {code:java} > final String name = new > NamedInternal(consumed.name()).orElseGenerateWithPrefix(this, > KStreamImpl.SOURCE_NAME); > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)