[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576949#comment-16576949 ]
ASF GitHub Bot commented on KAFKA-6966: --------------------------------------- mjsax closed pull request #5284: KAFKA-6966: Extend TopologyDescription.Sink to return TopicNameExtractor URL: https://github.com/apache/kafka/pull/5284 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 34f66ce53fe..35e1f77fd4c 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -90,6 +90,14 @@ <h1>Upgrade Guide and API Changes</h1> We have also removed some public APIs that are deprecated prior to 1.0.x in 2.0.0. See below for a detailed list of removed APIs. </p> + <h3><a id="streams_api_changes_210" href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3> + <p> + We updated <code>TopologyDescription</code> API to allow for better runtime checking. + Users are encouraged to use <code>#topicSet()</code> and <code>#topicPattern()</code> accordingly on <code>TopologyDescription.Source</code> nodes, + instead of using <code>#topics()</code>, which has since been deprecated. Similarly, use <code>#topic()</code> and <code>#topicNameExtractor()</code> + to get descriptions of <code>TopologyDescription.Sink</code> nodes. For more details, see + <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes">KIP-321</a>. + </p> <h3><a id="streams_api_changes_200" href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3> <p> diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java index 04a292f9a97..870052d7399 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams; +import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.StreamTask; import java.util.Set; +import java.util.regex.Pattern; /** * A meta representation of a {@link Topology topology}. @@ -113,8 +115,22 @@ /** * The topic names this source node is reading from. * @return comma separated list of topic names or pattern (as String) + * @deprecated use {@link #topicSet()} or {@link #topicPattern()} instead */ + @Deprecated String topics(); + + /** + * The topic names this source node is reading from. + * @return a set of topic names + */ + Set<String> topicSet(); + + /** + * The pattern used to match topic names that is reading from. + * @return the pattern used to match topic names + */ + Pattern topicPattern(); } /** @@ -134,10 +150,17 @@ interface Sink extends Node { /** * The topic name this sink node is writing to. - * Could be null if the topic name can only be dynamically determined based on {@code TopicNameExtractor} + * Could be {@code null} if the topic name can only be dynamically determined based on {@link TopicNameExtractor} * @return a topic name */ String topic(); + + /** + * The {@link TopicNameExtractor} class that this sink node uses to dynamically extract the topic name to write to. + * Could be {@code null} if the topic name is not dynamically determined. + * @return the {@link TopicNameExtractor} class used get the topic name + */ + TopicNameExtractor topicNameExtractor(); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 250105ad2a3..2944f6ba29b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -282,15 +282,7 @@ private boolean isMatch(final String topic) { @Override Source describe() { - final String sourceTopics; - - if (pattern == null) { - sourceTopics = topics.toString(); - } else { - sourceTopics = pattern.toString(); - } - - return new Source(name, sourceTopics); + return new Source(name, new HashSet<>(topics), pattern); } } @@ -1337,7 +1329,7 @@ public GlobalStore(final String sourceName, final String storeName, final String topicName, final int id) { - source = new Source(sourceName, topicName); + source = new Source(sourceName, Collections.singleton(topicName), null); processor = new Processor(processorName, Collections.singleton(storeName)); source.successors.add(processor); processor.predecessors.add(source); @@ -1424,19 +1416,33 @@ public void addSuccessor(final TopologyDescription.Node successor) { } public final static class Source extends AbstractNode implements TopologyDescription.Source { - private final String topics; + private final Set<String> topics; + private final Pattern topicPattern; public Source(final String name, - final String topics) { + final Set<String> topics, + final Pattern pattern) { super(name); this.topics = topics; + this.topicPattern = pattern; } + @Deprecated @Override public String topics() { + return topics.toString(); + } + + @Override + public Set<String> topicSet() { return topics; } + @Override + public Pattern topicPattern() { + return topicPattern; + } + @Override public void addPredecessor(final TopologyDescription.Node predecessor) { throw new UnsupportedOperationException("Sources don't have predecessors."); @@ -1444,7 +1450,9 @@ public void addPredecessor(final TopologyDescription.Node predecessor) { @Override public String toString() { - return "Source: " + name + " (topics: " + topics + ")\n --> " + nodeNames(successors); + final String topicsString = topics == null ? topicPattern.toString() : topics.toString(); + + return "Source: " + name + " (topics: " + topicsString + ")\n --> " + nodeNames(successors); } @Override @@ -1459,13 +1467,14 @@ public boolean equals(final Object o) { final Source source = (Source) o; // omit successor to avoid infinite loops return name.equals(source.name) - && topics.equals(source.topics); + && topics.equals(source.topics) + && topicPattern.equals(source.topicPattern); } @Override public int hashCode() { // omit successor as it might change and alter the hash code - return Objects.hash(name, topics); + return Objects.hash(name, topics, topicPattern); } } @@ -1528,10 +1537,20 @@ public Sink(final String name, @Override public String topic() { - if (topicNameExtractor instanceof StaticTopicNameExtractor) + if (topicNameExtractor instanceof StaticTopicNameExtractor) { return ((StaticTopicNameExtractor) topicNameExtractor).topicName; - else + } else { return null; + } + } + + @Override + public TopicNameExtractor topicNameExtractor() { + if (topicNameExtractor instanceof StaticTopicNameExtractor) { + return null; + } else { + return topicNameExtractor; + } } @Override @@ -1541,7 +1560,10 @@ public void addSuccessor(final TopologyDescription.Node successor) { @Override public String toString() { - return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors); + if (topicNameExtractor instanceof StaticTopicNameExtractor) { + return "Sink: " + name + " (topic: " + topic() + ")\n <-- " + nodeNames(predecessors); + } + return "Sink: " + name + " (extractor class: " + topicNameExtractor + ")\n <-- " + nodeNames(predecessors); } @Override diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index ece157cd02e..eeb08ac1f95 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.RecordContext; import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; @@ -370,6 +371,23 @@ public void shouldDescribeEmptyTopology() { assertThat(topology.describe(), equalTo(expectedDescription)); } + @Test + public void sinkShouldReturnNullTopicWithDynamicRouting() { + final TopologyDescription.Sink expectedSinkNode + = new InternalTopologyBuilder.Sink("sink", (key, value, record) -> record.topic() + "-" + key); + + assertThat(expectedSinkNode.topic(), equalTo(null)); + } + + @Test + public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() { + final TopicNameExtractor topicNameExtractor = (key, value, record) -> record.topic() + "-" + key; + final TopologyDescription.Sink expectedSinkNode + = new InternalTopologyBuilder.Sink("sink", topicNameExtractor); + + assertThat(expectedSinkNode.topicNameExtractor(), equalTo(topicNameExtractor)); + } + @Test public void singleSourceShouldHaveSingleSubtopology() { final TopologyDescription.Source expectedSourceNode = addSource("source", "topic"); @@ -629,6 +647,34 @@ public void shouldDescribeMultipleGlobalStoreTopology() { assertThat(topology.describe(), equalTo(expectedDescription)); } + @Test + public void topologyWithDynamicRoutingShouldDescribeExtractorClass() { + final StreamsBuilder builder = new StreamsBuilder(); + + final TopicNameExtractor topicNameExtractor = new TopicNameExtractor() { + @Override + public String extract(final Object key, final Object value, final RecordContext recordContext) { + return recordContext.topic() + "-" + key; + } + + @Override + public String toString() { + return "anonymous topic name extractor. topic is [recordContext.topic()]-[key]"; + } + }; + builder.stream("input-topic").to(topicNameExtractor); + final TopologyDescription describe = builder.build().describe(); + + assertEquals( + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n" + + " --> KSTREAM-SINK-0000000001\n" + + " Sink: KSTREAM-SINK-0000000001 (extractor class: anonymous topic name extractor. topic is [recordContext.topic()]-[key])\n" + + " <-- KSTREAM-SOURCE-0000000000\n\n", + describe.toString()); + } + @Test public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() { final StreamsBuilder builder = new StreamsBuilder(); @@ -1048,13 +1094,13 @@ public void kTableNamedMaterializedFilterShouldPreserveTopologyStructure() { for (int i = 1; i < sourceTopic.length; ++i) { allSourceTopics.append(", ").append(sourceTopic[i]); } - return new InternalTopologyBuilder.Source(sourceName, allSourceTopics.toString()); + return new InternalTopologyBuilder.Source(sourceName, new HashSet<>(Arrays.asList(sourceTopic)), null); } private TopologyDescription.Source addSource(final String sourceName, final Pattern sourcePattern) { topology.addSource(null, sourceName, null, null, null, sourcePattern); - return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString()); + return new InternalTopologyBuilder.Source(sourceName, null, sourcePattern); } private TopologyDescription.Processor addProcessor(final String processorName, ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Extend `TopologyDescription.Sink` to return `TopicNameExtractor` > ---------------------------------------------------------------- > > Key: KAFKA-6966 > URL: https://issues.apache.org/jira/browse/KAFKA-6966 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.0.0 > Reporter: Matthias J. Sax > Assignee: Nishanth Pradeep > Priority: Major > Labels: beginner, kip, newbie > > With KIP-303, a dynamic routing feature was added and > `TopologyDescription.Sink#topic()` returns `null` if this feature is used. > It would be useful to get the actually used `TopicNameExtractor` class from > the `TopologyDescription`. > We suggest to add `Class<? extends TopicNameExtractor> > TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if > dynamic routing feature is not used. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)