[
https://issues.apache.org/jira/browse/KAFKA-12336?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Bill Bejeck resolved KAFKA-12336.
---------------------------------
Resolution: Fixed
> custom stream naming does not work while calling stream[K, V](topicPattern:
> Pattern) API with named Consumed parameter
> -----------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-12336
> URL: https://issues.apache.org/jira/browse/KAFKA-12336
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.7.0
> Reporter: Ramil Israfilov
> Assignee: GeordieMai
> Priority: Minor
> Labels: easy-fix, newbie
>
> In our Scala application I am trying to implement custom naming for Kafka
> Streams application nodes.
> We are using topicPattern for our stream source.
> Here is an API which I am calling:
>
> {code:java}
> val topicsPattern="t-[A-Za-z0-9-].suffix"
> val operations: KStream[MyKey, MyValue] =
> builder.stream[MyKey, MyValue](Pattern.compile(topicsPattern))(
> Consumed.`with`[MyKey, MyValue].withName("my-fancy-name")
> )
> {code}
> Despite the fact that I am providing Consumed with custom name the topology
> describe still show "KSTREAM-SOURCE-0000000000" as name for our stream source.
> It is not a problem if I just use a name for topic. But our application needs
> to get messages from set of topics based on topicname pattern matching.
> After checking the kakfa code I see that
> org.apache.kafka.streams.kstream.internals.InternalStreamBuilder (on line
> 103) has a bug:
> {code:java}
> public <K, V> KStream<K, V> stream(final Pattern topicPattern,
> final ConsumedInternal<K, V> consumed) {
> final String name = newProcessorName(KStreamImpl.SOURCE_NAME);
> final StreamSourceNode<K, V> streamPatternSourceNode = new
> StreamSourceNode<>(name, topicPattern, consumed);
> {code}
> node name construction does not take into account the name of consumed
> parameter.
> For example code for another stream api call with topic name does it
> correctly:
> {code:java}
> final String name = new
> NamedInternal(consumed.name()).orElseGenerateWithPrefix(this,
> KStreamImpl.SOURCE_NAME);
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)