[ https://issues.apache.org/jira/browse/KAFKA-4791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15889382#comment-15889382 ]
ASF GitHub Bot commented on KAFKA-4791: --------------------------------------- GitHub user bbejeck opened a pull request: https://github.com/apache/kafka/pull/2618 KAFKA-4791: unable to add state store with regex matched topics Fix for adding state stores with regex defined sources You can merge this pull request into a Git repository by running: $ git pull https://github.com/bbejeck/kafka KAFKA-4791_unable_to_add_statestore_regex_topics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/kafka/pull/2618.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2618 ---- commit 828928905e393113e2a4f0148b8f1edc48d6e077 Author: bbejeck <bbej...@gmail.com> Date: 2017-03-01T02:27:50Z KAFKA-4791: unable to add state store with regex matched topics ---- > Kafka Streams - unable to add state stores when using wildcard topics on the > source > ----------------------------------------------------------------------------------- > > Key: KAFKA-4791 > URL: https://issues.apache.org/jira/browse/KAFKA-4791 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.1.1 > Environment: Java 8 > Reporter: Bart Vercammen > Assignee: Bill Bejeck > > I'm trying to build up a topology (using TopologyBuilder) with following > components : > {code} > new TopologyBuilder() > .addSource("ingest", Pattern.compile( ... )) > .addProcessor("myprocessor", ..., "ingest") > .addStateStore(dataStore, "myprocessor") > {code} > Somehow this does not seem to work. > When creating the topology with exact topic names, all works fine, but it > seems not possible to attach state stores when using wildcard topics on the > sources. > Inside {{addStateStore}}, the processor gets connected to the state store > with {{connectProcessorAndStateStore}}, and there it will try to connect the > state store with the source topics from the processor: > {{connectStateStoreNameToSourceTopics}} > Here lies the problem: > {code} > private Set<String> findSourceTopicsForProcessorParents(String [] > parents) { > final Set<String> sourceTopics = new HashSet<>(); > for (String parent : parents) { > NodeFactory nodeFactory = nodeFactories.get(parent); > if (nodeFactory instanceof SourceNodeFactory) { > sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics())); > } else if (nodeFactory instanceof ProcessorNodeFactory) { > > sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) > nodeFactory).parents)); > } > } > return sourceTopics; > } > {code} > The call to {{sourceTopics.addAll(Arrays.asList(((SourceNodeFactory) > nodeFactory).getTopics()))}} will fail as there are no topics inside the > {{SourceNodeFactory}} object, only a pattern ({{.getTopics}} returns null) > I also tried to search for some unit tests inside the Kafka Streams project > that cover this scenario, but alas, I was not able to find any. > Only some tests on state stores with exact topic names, and some tests on > wildcard topics, but no combination of both ... -- This message was sent by Atlassian JIRA (v6.3.15#6346)