[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15881018#comment-15881018 ]
Bill Bejeck commented on KAFKA-4791: ------------------------------------ Fair enough. I did not look into the issue at all, at first blush it seemed like a big that needed to be fixed asap. But considering your comments and the forthcoming changes with KIP-120, I'll hold off. > Kafka Streams - unable to add state stores when using wildcard topics on the > source > ----------------------------------------------------------------------------------- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.1 > Environment: Java 8 > Reporter: Bart Vercammen > Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set<String> findSourceTopicsForProcessorParents(String [] > parents) { > final Set<String> sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)