ableegoldman commented on a change in pull request #9582: URL: https://github.com/apache/kafka/pull/9582#discussion_r523183821
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java ########## @@ -314,6 +317,50 @@ public void buildAndOptimizeTopology(final Properties props) { internalTopologyBuilder.validateCopartition(); } + private void mergeDuplicateSourceNodes() { + final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>(); + + // We don't really care about the order here, but since Pattern does not implement equals() we can't rely on + // a regular HashMap and containsKey(Pattern). But for our purposes it's sufficient to compare the compiled + // string and flags to determine if two pattern subscriptions can be merged into a single source node + final Map<Pattern, StreamSourceNode<?, ?>> patternsToSourceNodes = + new TreeMap<>(Comparator.comparing(Pattern::pattern).thenComparing(Pattern::flags)); + + for (final StreamsGraphNode graphNode : root.children()) { + if (graphNode instanceof StreamSourceNode) { + final StreamSourceNode<?, ?> currentSourceNode = (StreamSourceNode<?, ?>) graphNode; + + if (currentSourceNode.topicPattern() != null) { + if (!patternsToSourceNodes.containsKey(currentSourceNode.topicPattern())) { + patternsToSourceNodes.put(currentSourceNode.topicPattern(), currentSourceNode); + } else { + final StreamSourceNode<?, ?> mainSourceNode = patternsToSourceNodes.get(currentSourceNode.topicPattern()); + mainSourceNode.merge(currentSourceNode); + root.removeChild(graphNode); + } + } else { + for (final String topic : currentSourceNode.topicNames()) { + if (!topicsToSourceNodes.containsKey(topic)) { + topicsToSourceNodes.put(topic, currentSourceNode); + } else { + final StreamSourceNode<?, ?> mainSourceNode = topicsToSourceNodes.get(topic); + // TODO we only merge source nodes if the subscribed topic(s) are an exact match, so it's still not + // possible to subscribe to topicA in one KStream and topicA + topicB in another. We could achieve + // this by splitting these source nodes into one topic per node and routing to the subscribed children Review comment: Filed https://issues.apache.org/jira/browse/KAFKA-10721 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org