Repository: kafka Updated Branches: refs/heads/trunk 4e92fd5f7 -> 15e0234a5
KAFKA-4791: unable to add state store with regex matched topics Fix for adding state stores with regex defined sources Author: bbejeck <[email protected]> Reviewers: Matthias J. Sax, Damian Guy, Guozhang Wang Closes #2618 from bbejeck/KAFKA-4791_unable_to_add_statestore_regex_topics Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/15e0234a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/15e0234a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/15e0234a Branch: refs/heads/trunk Commit: 15e0234a5f4976facd4cfe61b91cfcdec6f6083c Parents: 4e92fd5 Author: Bill Bejeck <[email protected]> Authored: Thu Mar 30 15:44:56 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 30 15:44:56 2017 -0700 ---------------------------------------------------------------------- .../streams/processor/TopologyBuilder.java | 185 ++++++++++++------- .../integration/RegexSourceIntegrationTest.java | 30 +++ .../streams/processor/TopologyBuilderTest.java | 34 ++++ 3 files changed, 178 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 99f5d65..7c2ec4f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -99,6 +99,10 @@ public class TopologyBuilder { // are connected to these state stores private final Map<String, Set<String>> stateStoreNameToSourceTopics = new HashMap<>(); + // map from state store names to all the regex subscribed topics from source processors that + // are connected to these state stores + private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new HashMap<>(); + // map from state store names to this state store's corresponding changelog topic if possible, // this is used in the extended KStreamBuilder. private final Map<String, String> storeToChangelogTopic = new HashMap<>(); @@ -174,7 +178,7 @@ public class TopologyBuilder { private SourceNodeFactory(String name, String[] topics, Pattern pattern, Deserializer<?> keyDeserializer, Deserializer<?> valDeserializer) { super(name); - this.topics = topics != null ? Arrays.asList(topics) : null; + this.topics = topics != null ? Arrays.asList(topics) : new ArrayList<String>(); this.pattern = pattern; this.keyDeserializer = keyDeserializer; this.valDeserializer = valDeserializer; @@ -311,7 +315,7 @@ public class TopologyBuilder { * @param applicationId the streams applicationId. Should be the same as set by * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG} */ - public synchronized final TopologyBuilder setApplicationId(String applicationId) { + public synchronized final TopologyBuilder setApplicationId(final String applicationId) { Objects.requireNonNull(applicationId, "applicationId can't be null"); this.applicationId = applicationId; @@ -329,7 +333,7 @@ public class TopologyBuilder { * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder addSource(String name, String... topics) { + public synchronized final TopologyBuilder addSource(final String name, final String... topics) { return addSource(null, name, null, null, topics); } @@ -345,7 +349,7 @@ public class TopologyBuilder { * @param topics the name of one or more Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, String... topics) { + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final String... topics) { return addSource(offsetReset, name, null, null, topics); } @@ -362,7 +366,7 @@ public class TopologyBuilder { * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder addSource(String name, Pattern topicPattern) { + public synchronized final TopologyBuilder addSource(final String name, final Pattern topicPattern) { return addSource(null, name, null, null, topicPattern); } @@ -379,7 +383,7 @@ public class TopologyBuilder { * @param topicPattern regular expression pattern to match Kafka topics that this source is to consume * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Pattern topicPattern) { + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Pattern topicPattern) { return addSource(offsetReset, name, null, null, topicPattern); } @@ -400,7 +404,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source */ - public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) { + public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) { return addSource(null, name, keyDeserializer, valDeserializer, topics); } @@ -422,7 +426,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if processor is already added or if topics have already been registered by another source */ - public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, String... topics) { + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final String... topics) { if (topics.length == 0) { throw new TopologyBuilderException("You must provide at least one topic"); } @@ -540,7 +544,7 @@ public class TopologyBuilder { * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name */ - public synchronized final TopologyBuilder addSource(String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) { + public synchronized final TopologyBuilder addSource(final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) { return addSource(null, name, keyDeserializer, valDeserializer, topicPattern); } @@ -566,7 +570,7 @@ public class TopologyBuilder { * @throws TopologyBuilderException if processor is already added or if topics have already been registered by name */ - public synchronized final TopologyBuilder addSource(AutoOffsetReset offsetReset, String name, Deserializer keyDeserializer, Deserializer valDeserializer, Pattern topicPattern) { + public synchronized final TopologyBuilder addSource(final AutoOffsetReset offsetReset, final String name, final Deserializer keyDeserializer, final Deserializer valDeserializer, final Pattern topicPattern) { Objects.requireNonNull(topicPattern, "topicPattern can't be null"); Objects.requireNonNull(name, "name can't be null"); @@ -604,7 +608,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ - public synchronized final TopologyBuilder addSink(String name, String topic, String... parentNames) { + public synchronized final TopologyBuilder addSink(final String name, final String topic, final String... parentNames) { return addSink(name, topic, null, null, parentNames); } @@ -631,7 +635,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ - public synchronized final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) { + public synchronized final TopologyBuilder addSink(final String name, final String topic, final StreamPartitioner partitioner, final String... parentNames) { return addSink(name, topic, null, null, partitioner, parentNames); } @@ -654,7 +658,7 @@ public class TopologyBuilder { * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ - public synchronized final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) { + public synchronized final TopologyBuilder addSink(final String name, final String topic, final Serializer keySerializer, final Serializer valSerializer, final String... parentNames) { return addSink(name, topic, keySerializer, valSerializer, null, parentNames); } @@ -679,7 +683,7 @@ public class TopologyBuilder { * @see #addSink(String, String, Serializer, Serializer, String...) * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name */ - public synchronized final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<? super K, ? super V> partitioner, String... parentNames) { + public synchronized final <K, V> TopologyBuilder addSink(final String name, final String topic, final Serializer<K> keySerializer, final Serializer<V> valSerializer, final StreamPartitioner<? super K, ? super V> partitioner, final String... parentNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(topic, "topic must not be null"); if (nodeFactories.containsKey(name)) @@ -713,7 +717,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if parent processor is not added yet, or if this processor's name is equal to the parent's name */ - public synchronized final TopologyBuilder addProcessor(String name, ProcessorSupplier supplier, String... parentNames) { + public synchronized final TopologyBuilder addProcessor(final String name, final ProcessorSupplier supplier, final String... parentNames) { Objects.requireNonNull(name, "name must not be null"); Objects.requireNonNull(supplier, "supplier must not be null"); if (nodeFactories.containsKey(name)) @@ -742,7 +746,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if state store supplier is already added */ - public synchronized final TopologyBuilder addStateStore(StateStoreSupplier supplier, String... processorNames) { + public synchronized final TopologyBuilder addStateStore(final StateStoreSupplier supplier, final String... processorNames) { Objects.requireNonNull(supplier, "supplier can't be null"); if (stateFactories.containsKey(supplier.name())) { throw new TopologyBuilderException("StateStore " + supplier.name() + " is already added."); @@ -766,7 +770,7 @@ public class TopologyBuilder { * @param stateStoreNames the names of state stores that the processor uses * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder connectProcessorAndStateStores(String processorName, String... stateStoreNames) { + public synchronized final TopologyBuilder connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) { Objects.requireNonNull(processorName, "processorName can't be null"); if (stateStoreNames != null) { for (String stateStoreName : stateStoreNames) { @@ -781,7 +785,7 @@ public class TopologyBuilder { * This is used only for KStreamBuilder: when adding a KTable from a source topic, * we need to add the topic as the KTable's materialized state store's changelog. */ - protected synchronized final TopologyBuilder connectSourceStoreAndTopic(String sourceStoreName, String topic) { + protected synchronized final TopologyBuilder connectSourceStoreAndTopic(final String sourceStoreName, final String topic) { if (storeToChangelogTopic.containsKey(sourceStoreName)) { throw new TopologyBuilderException("Source store " + sourceStoreName + " is already added."); } @@ -799,7 +803,7 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @throws TopologyBuilderException if less than two processors are specified, or if one of the processors is not added yet */ - public synchronized final TopologyBuilder connectProcessors(String... processorNames) { + public synchronized final TopologyBuilder connectProcessors(final String... processorNames) { if (processorNames.length < 2) throw new TopologyBuilderException("At least two processors need to participate in the connection."); @@ -822,7 +826,7 @@ public class TopologyBuilder { * @param topicName the name of the topic * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder addInternalTopic(String topicName) { + public synchronized final TopologyBuilder addInternalTopic(final String topicName) { Objects.requireNonNull(topicName, "topicName can't be null"); this.internalTopicNames.add(topicName); @@ -835,69 +839,85 @@ public class TopologyBuilder { * @param sourceNodes a set of source node names * @return this builder instance so methods can be chained together; never null */ - public synchronized final TopologyBuilder copartitionSources(Collection<String> sourceNodes) { + public synchronized final TopologyBuilder copartitionSources(final Collection<String> sourceNodes) { copartitionSourceGroups.add(Collections.unmodifiableSet(new HashSet<>(sourceNodes))); return this; } - private void connectProcessorAndStateStore(String processorName, String stateStoreName) { + private void connectProcessorAndStateStore(final String processorName, final String stateStoreName) { if (!stateFactories.containsKey(stateStoreName)) throw new TopologyBuilderException("StateStore " + stateStoreName + " is not added yet."); if (!nodeFactories.containsKey(processorName)) throw new TopologyBuilderException("Processor " + processorName + " is not added yet."); - StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName); - Iterator<String> iter = stateStoreFactory.users.iterator(); + final StateStoreFactory stateStoreFactory = stateFactories.get(stateStoreName); + final Iterator<String> iter = stateStoreFactory.users.iterator(); if (iter.hasNext()) { - String user = iter.next(); + final String user = iter.next(); nodeGrouper.unite(user, processorName); } stateStoreFactory.users.add(processorName); NodeFactory nodeFactory = nodeFactories.get(processorName); if (nodeFactory instanceof ProcessorNodeFactory) { - ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory; + final ProcessorNodeFactory processorNodeFactory = (ProcessorNodeFactory) nodeFactory; processorNodeFactory.addStateStore(stateStoreName); - connectStateStoreNameToSourceTopics(stateStoreName, processorNodeFactory); + connectStateStoreNameToSourceTopicsOrPattern(stateStoreName, processorNodeFactory); } else { throw new TopologyBuilderException("cannot connect a state store " + stateStoreName + " to a source node or a sink node."); } } - private Set<String> findSourceTopicsForProcessorParents(String[] parents) { - final Set<String> sourceTopics = new HashSet<>(); + private Set<SourceNodeFactory> findSourcesForProcessorParents(final String[] parents) { + final Set<SourceNodeFactory> sourceNodes = new HashSet<>(); for (String parent : parents) { - NodeFactory nodeFactory = nodeFactories.get(parent); + final NodeFactory nodeFactory = nodeFactories.get(parent); if (nodeFactory instanceof SourceNodeFactory) { - sourceTopics.addAll(((SourceNodeFactory) nodeFactory).topics); + sourceNodes.add((SourceNodeFactory) nodeFactory); } else if (nodeFactory instanceof ProcessorNodeFactory) { - sourceTopics.addAll(findSourceTopicsForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents)); + sourceNodes.addAll(findSourcesForProcessorParents(((ProcessorNodeFactory) nodeFactory).parents)); } } - return sourceTopics; + return sourceNodes; } - private void connectStateStoreNameToSourceTopics(final String stateStoreName, - final ProcessorNodeFactory processorNodeFactory) { + private void connectStateStoreNameToSourceTopicsOrPattern(final String stateStoreName, + final ProcessorNodeFactory processorNodeFactory) { // we should never update the mapping from state store names to source topics if the store name already exists // in the map; this scenario is possible, for example, that a state store underlying a source KTable is // connecting to a join operator whose source topic is not the original KTable's source topic but an internal repartition topic. - if (stateStoreNameToSourceTopics.containsKey(stateStoreName)) { + + if (stateStoreNameToSourceTopics.containsKey(stateStoreName) || stateStoreNameToSourceRegex.containsKey(stateStoreName)) { return; } - final Set<String> sourceTopics = findSourceTopicsForProcessorParents(processorNodeFactory.parents); - if (sourceTopics.isEmpty()) { - throw new TopologyBuilderException("can't find source topic for state store " + - stateStoreName); + final Set<String> sourceTopics = new HashSet<>(); + final Set<Pattern> sourcePatterns = new HashSet<>(); + final Set<SourceNodeFactory> sourceNodesForParent = findSourcesForProcessorParents(processorNodeFactory.parents); + + for (SourceNodeFactory sourceNodeFactory : sourceNodesForParent) { + if (sourceNodeFactory.pattern != null) { + sourcePatterns.add(sourceNodeFactory.pattern); + } else { + sourceTopics.addAll(sourceNodeFactory.topics); + } + } + + if (!sourceTopics.isEmpty()) { + stateStoreNameToSourceTopics.put(stateStoreName, + Collections.unmodifiableSet(sourceTopics)); + } + + if (!sourcePatterns.isEmpty()) { + stateStoreNameToSourceRegex.put(stateStoreName, + Collections.unmodifiableSet(sourcePatterns)); } - stateStoreNameToSourceTopics.put(stateStoreName, - Collections.unmodifiableSet(sourceTopics)); + } - private <T> void maybeAddToResetList(Collection<T> earliestResets, Collection<T> latestResets, AutoOffsetReset offsetReset, T item) { + private <T> void maybeAddToResetList(final Collection<T> earliestResets, final Collection<T> latestResets, final AutoOffsetReset offsetReset, final T item) { if (offsetReset != null) { switch (offsetReset) { case EARLIEST: @@ -925,8 +945,8 @@ public class TopologyBuilder { } private Map<Integer, Set<String>> makeNodeGroups() { - HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>(); - HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>(); + final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>(); + final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>(); int nodeGroupId = 0; @@ -935,7 +955,7 @@ public class TopologyBuilder { allSourceNodes.addAll(nodeToSourcePatterns.keySet()); for (String nodeName : Utils.sorted(allSourceNodes)) { - String root = nodeGrouper.root(nodeName); + final String root = nodeGrouper.root(nodeName); Set<String> nodeGroup = rootToNodeGroup.get(root); if (nodeGroup == null) { nodeGroup = new HashSet<>(); @@ -948,7 +968,7 @@ public class TopologyBuilder { // Go through non-source nodes for (String nodeName : Utils.sorted(nodeFactories.keySet())) { if (!nodeToSourceTopics.containsKey(nodeName)) { - String root = nodeGrouper.root(nodeName); + final String root = nodeGrouper.root(nodeName); Set<String> nodeGroup = rootToNodeGroup.get(root); if (nodeGroup == null) { nodeGroup = new HashSet<>(); @@ -968,7 +988,7 @@ public class TopologyBuilder { * * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig) */ - public synchronized ProcessorTopology build(Integer topicGroupId) { + public synchronized ProcessorTopology build(final Integer topicGroupId) { Set<String> nodeGroup; if (topicGroupId != null) { nodeGroup = nodeGroups().get(topicGroupId); @@ -1016,12 +1036,12 @@ public class TopologyBuilder { return globalGroups; } - private ProcessorTopology build(Set<String> nodeGroup) { - List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); - Map<String, ProcessorNode> processorMap = new HashMap<>(); - Map<String, SourceNode> topicSourceMap = new HashMap<>(); - Map<String, SinkNode> topicSinkMap = new HashMap<>(); - Map<String, StateStore> stateStoreMap = new LinkedHashMap<>(); + private ProcessorTopology build(final Set<String> nodeGroup) { + final List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); + final Map<String, ProcessorNode> processorMap = new HashMap<>(); + final Map<String, SourceNode> topicSourceMap = new HashMap<>(); + final Map<String, SinkNode> topicSinkMap = new HashMap<>(); + final Map<String, StateStore> stateStoreMap = new LinkedHashMap<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) for (NodeFactory factory : nodeFactories.values()) { @@ -1032,7 +1052,7 @@ public class TopologyBuilder { if (factory instanceof ProcessorNodeFactory) { for (String parent : ((ProcessorNodeFactory) factory).parents) { - ProcessorNode<?, ?> parentNode = processorMap.get(parent); + final ProcessorNode<?, ?> parentNode = processorMap.get(parent); parentNode.addChild(node); } for (String stateStoreName : ((ProcessorNodeFactory) factory).stateStoreNames) { @@ -1106,19 +1126,19 @@ public class TopologyBuilder { * @return groups of topic names */ public synchronized Map<Integer, TopicsInfo> topicGroups() { - Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>(); + final Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>(); if (nodeGroups == null) nodeGroups = makeNodeGroups(); for (Map.Entry<Integer, Set<String>> entry : nodeGroups.entrySet()) { - Set<String> sinkTopics = new HashSet<>(); - Set<String> sourceTopics = new HashSet<>(); - Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>(); - Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>(); + final Set<String> sinkTopics = new HashSet<>(); + final Set<String> sourceTopics = new HashSet<>(); + final Map<String, InternalTopicConfig> internalSourceTopics = new HashMap<>(); + final Map<String, InternalTopicConfig> stateChangelogTopics = new HashMap<>(); for (String node : entry.getValue()) { // if the node is a source node, add to the source topics - List<String> topics = nodeToSourceTopics.get(node); + final List<String> topics = nodeToSourceTopics.get(node); if (topics != null) { // if some of the topics are internal, add them to the internal topics for (String topic : topics) { @@ -1128,7 +1148,7 @@ public class TopologyBuilder { } if (this.internalTopicNames.contains(topic)) { // prefix the internal topic name with the application id - String internalTopic = decorateTopic(topic); + final String internalTopic = decorateTopic(topic); internalSourceTopics.put(internalTopic, new InternalTopicConfig(internalTopic, Collections.singleton(InternalTopicConfig.CleanupPolicy.delete), Collections.<String, String>emptyMap())); @@ -1140,7 +1160,7 @@ public class TopologyBuilder { } // if the node is a sink node, add to the sink topics - String topic = nodeToSinkTopic.get(node); + final String topic = nodeToSinkTopic.get(node); if (topic != null) { if (internalTopicNames.contains(topic)) { // prefix the change log topic name with the application id @@ -1175,7 +1195,7 @@ public class TopologyBuilder { private void setRegexMatchedTopicsToSourceNodes() { if (subscriptionUpdates.hasUpdates()) { for (Map.Entry<String, Pattern> stringPatternEntry : nodeToSourcePatterns.entrySet()) { - SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); + final SourceNodeFactory sourceNode = (SourceNodeFactory) nodeFactories.get(stringPatternEntry.getKey()); //need to update nodeToSourceTopics with topics matched from given regex nodeToSourceTopics.put(stringPatternEntry.getKey(), sourceNode.getTopics(subscriptionUpdates.getUpdates())); log.debug("nodeToSourceTopics {}", nodeToSourceTopics); @@ -1183,6 +1203,28 @@ public class TopologyBuilder { } } + private void setRegexMatchedTopicToStateStore() { + if (subscriptionUpdates.hasUpdates()) { + for (Map.Entry<String, Set<Pattern>> storePattern : stateStoreNameToSourceRegex.entrySet()) { + final Set<String> updatedTopicsForStateStore = new HashSet<>(); + for (String subscriptionUpdateTopic : subscriptionUpdates.getUpdates()) { + for (Pattern pattern : storePattern.getValue()) { + if (pattern.matcher(subscriptionUpdateTopic).matches()) { + updatedTopicsForStateStore.add(subscriptionUpdateTopic); + } + } + } + if (!updatedTopicsForStateStore.isEmpty()) { + Collection<String> storeTopics = stateStoreNameToSourceTopics.get(storePattern.getKey()); + if (storeTopics != null) { + updatedTopicsForStateStore.addAll(storeTopics); + } + stateStoreNameToSourceTopics.put(storePattern.getKey(), Collections.unmodifiableSet(updatedTopicsForStateStore)); + } + } + } + } + private InternalTopicConfig createInternalTopicConfig(final StateStoreSupplier<?> supplier, final String name) { if (!(supplier instanceof WindowStoreSupplier)) { return new InternalTopicConfig(name, Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), supplier.logConfig()); @@ -1223,7 +1265,7 @@ public class TopologyBuilder { return latestPattern; } - private void ensureNoRegexOverlap(Pattern builtPattern, Set<Pattern> otherPatterns, Set<String> otherTopics) { + private void ensureNoRegexOverlap(final Pattern builtPattern, final Set<Pattern> otherPatterns, final Set<String> otherTopics) { for (Pattern otherPattern : otherPatterns) { if (builtPattern.pattern().contains(otherPattern.pattern())) { @@ -1246,8 +1288,8 @@ public class TopologyBuilder { * @param sourcePatterns Patterns for matching source topics to add to a composite pattern * @return a Pattern that is composed of the literal source topic names and any Patterns for matching source topics */ - private static synchronized Pattern buildPatternForOffsetResetTopics(Collection<String> sourceTopics, Collection<Pattern> sourcePatterns) { - StringBuilder builder = new StringBuilder(); + private static synchronized Pattern buildPatternForOffsetResetTopics(final Collection<String> sourceTopics, final Collection<Pattern> sourcePatterns) { + final StringBuilder builder = new StringBuilder(); for (String topic : sourceTopics) { builder.append(topic).append("|"); @@ -1283,7 +1325,7 @@ public class TopologyBuilder { * @return groups of topic names */ public synchronized Collection<Set<String>> copartitionGroups() { - List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size()); + final List<Set<String>> list = new ArrayList<>(copartitionSourceGroups.size()); for (Set<String> nodeNames : copartitionSourceGroups) { Set<String> copartitionGroup = new HashSet<>(); for (String node : nodeNames) { @@ -1308,7 +1350,7 @@ public class TopologyBuilder { return decoratedTopics; } - private String decorateTopic(String topic) { + private String decorateTopic(final String topic) { if (applicationId == null) { throw new TopologyBuilderException("there are internal topics and " + "applicationId hasn't been set. Call " @@ -1320,7 +1362,7 @@ public class TopologyBuilder { public synchronized Pattern sourceTopicPattern() { if (this.topicPattern == null) { - List<String> allSourceTopics = new ArrayList<>(); + final List<String> allSourceTopics = new ArrayList<>(); if (!nodeToSourceTopics.isEmpty()) { for (List<String> topics : nodeToSourceTopics.values()) { allSourceTopics.addAll(maybeDecorateInternalSourceTopics(topics)); @@ -1334,9 +1376,10 @@ public class TopologyBuilder { return this.topicPattern; } - public synchronized void updateSubscriptions(SubscriptionUpdates subscriptionUpdates, String threadId) { + public synchronized void updateSubscriptions(final SubscriptionUpdates subscriptionUpdates, final String threadId) { log.debug("stream-thread [{}] updating builder with {} topic(s) with possible matching regex subscription(s)", threadId, subscriptionUpdates); this.subscriptionUpdates = subscriptionUpdates; setRegexMatchedTopicsToSourceNodes(); + setRegexMatchedTopicToStateStore(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index a84a208..b671c4e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -32,12 +32,15 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockStateStoreSupplier; import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; @@ -55,11 +58,13 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.UUID; import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -230,6 +235,31 @@ public class RegexSourceIntegrationTest { streams.close(); } + @Test + public void shouldAddStateStoreToRegexDefinedSource() throws Exception { + + ProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); + MockStateStoreSupplier stateStoreSupplier = new MockStateStoreSupplier("testStateStore", false); + + TopologyBuilder builder = new TopologyBuilder() + .addSource("ingest", Pattern.compile("topic-\\d+")) + .addProcessor("my-processor", processorSupplier, "ingest") + .addStateStore(stateStoreSupplier, "my-processor"); + + + final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); + streams.start(); + + final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class); + + IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("message for test"), producerConfig, mockTime); + streams.close(); + + Map<String, List<String>> stateStoreToSourceTopic = builder.stateStoreNameToSourceTopics(); + + assertThat(stateStoreToSourceTopic.get("testStateStore").get(0), is("topic-1")); + } + @Test public void testShouldReadFromRegexAndNamedTopics() throws Exception { http://git-wip-us.apache.org/repos/asf/kafka/blob/15e0234a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index 88a420a..7c8b15f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -51,6 +51,7 @@ import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; public class TopologyBuilderTest { @@ -697,4 +698,37 @@ public class TopologyBuilderTest { assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3")); } + + @SuppressWarnings("unchecked") + @Test + public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { + + final TopologyBuilder topologyBuilder = new TopologyBuilder() + .addSource("ingest", Pattern.compile("topic-\\d+")) + .addProcessor("my-processor", new MockProcessorSupplier(), "ingest") + .addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor"); + + final StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates(); + final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); + updatedTopicsField.setAccessible(true); + + final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates); + + updatedTopics.add("topic-2"); + updatedTopics.add("topic-3"); + updatedTopics.add("topic-A"); + + topologyBuilder.updateSubscriptions(subscriptionUpdates, "test-thread"); + topologyBuilder.setApplicationId("test-app"); + + Map<String, List<String>> stateStoreAndTopics = topologyBuilder.stateStoreNameToSourceTopics(); + List<String> topics = stateStoreAndTopics.get("testStateStore"); + + assertTrue("Expected to contain two topics", topics.size() == 2); + + assertTrue(topics.contains("topic-2")); + assertTrue(topics.contains("topic-3")); + assertFalse(topics.contains("topic-A")); + } + }
