[ 
https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16576949#comment-16576949
 ] 

ASF GitHub Bot commented on KAFKA-6966:
---------------------------------------

mjsax closed pull request #5284: KAFKA-6966: Extend TopologyDescription.Sink to 
return TopicNameExtractor
URL: https://github.com/apache/kafka/pull/5284
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html
index 34f66ce53fe..35e1f77fd4c 100644
--- a/docs/streams/upgrade-guide.html
+++ b/docs/streams/upgrade-guide.html
@@ -90,6 +90,14 @@ <h1>Upgrade Guide and API Changes</h1>
         We have also removed some public APIs that are deprecated prior to 
1.0.x in 2.0.0.
         See below for a detailed list of removed APIs.
     </p>
+    <h3><a id="streams_api_changes_210" 
href="#streams_api_changes_210">Streams API changes in 2.1.0</a></h3>
+    <p>
+        We updated <code>TopologyDescription</code> API to allow for better 
runtime checking.
+        Users are encouraged to use <code>#topicSet()</code> and 
<code>#topicPattern()</code> accordingly on 
<code>TopologyDescription.Source</code> nodes,
+        instead of using <code>#topics()</code>, which has since been 
deprecated. Similarly, use <code>#topic()</code> and 
<code>#topicNameExtractor()</code>
+        to get descriptions of <code>TopologyDescription.Sink</code> nodes. 
For more details, see
+        <a 
href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes";>KIP-321</a>.
+    </p>
 
     <h3><a id="streams_api_changes_200" 
href="#streams_api_changes_200">Streams API changes in 2.0.0</a></h3>
     <p>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 04a292f9a97..870052d7399 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -16,9 +16,11 @@
  */
 package org.apache.kafka.streams;
 
+import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 
 import java.util.Set;
+import java.util.regex.Pattern;
 
 /**
  * A meta representation of a {@link Topology topology}.
@@ -113,8 +115,22 @@
         /**
          * The topic names this source node is reading from.
          * @return comma separated list of topic names or pattern (as String)
+         * @deprecated use {@link #topicSet()} or {@link #topicPattern()} 
instead
          */
+        @Deprecated
         String topics();
+
+        /**
+         * The topic names this source node is reading from.
+         * @return a set of topic names
+         */
+        Set<String> topicSet();
+
+        /**
+         * The pattern used to match topic names that is reading from.
+         * @return the pattern used to match topic names
+         */
+        Pattern topicPattern();
     }
 
     /**
@@ -134,10 +150,17 @@
     interface Sink extends Node {
         /**
          * The topic name this sink node is writing to.
-         * Could be null if the topic name can only be dynamically determined 
based on {@code TopicNameExtractor}
+         * Could be {@code null} if the topic name can only be dynamically 
determined based on {@link TopicNameExtractor}
          * @return a topic name
          */
         String topic();
+
+        /**
+         * The {@link TopicNameExtractor} class that this sink node uses to 
dynamically extract the topic name to write to.
+         * Could be {@code null} if the topic name is not dynamically 
determined.
+         * @return the {@link TopicNameExtractor} class used get the topic name
+         */
+        TopicNameExtractor topicNameExtractor();
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 250105ad2a3..2944f6ba29b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -282,15 +282,7 @@ private boolean isMatch(final String topic) {
 
         @Override
         Source describe() {
-            final String sourceTopics;
-
-            if (pattern == null) {
-                sourceTopics = topics.toString();
-            } else {
-                sourceTopics = pattern.toString();
-            }
-
-            return new Source(name, sourceTopics);
+            return new Source(name, new HashSet<>(topics), pattern);
         }
     }
 
@@ -1337,7 +1329,7 @@ public GlobalStore(final String sourceName,
                            final String storeName,
                            final String topicName,
                            final int id) {
-            source = new Source(sourceName, topicName);
+            source = new Source(sourceName, Collections.singleton(topicName), 
null);
             processor = new Processor(processorName, 
Collections.singleton(storeName));
             source.successors.add(processor);
             processor.predecessors.add(source);
@@ -1424,19 +1416,33 @@ public void addSuccessor(final TopologyDescription.Node 
successor) {
     }
 
     public final static class Source extends AbstractNode implements 
TopologyDescription.Source {
-        private final String topics;
+        private final Set<String> topics;
+        private final Pattern topicPattern;
 
         public Source(final String name,
-                      final String topics) {
+                      final Set<String> topics,
+                      final Pattern pattern) {
             super(name);
             this.topics = topics;
+            this.topicPattern = pattern;
         }
 
+        @Deprecated
         @Override
         public String topics() {
+            return topics.toString();
+        }
+
+        @Override
+        public Set<String> topicSet() {
             return topics;
         }
 
+        @Override
+        public Pattern topicPattern() {
+            return topicPattern;
+        }
+
         @Override
         public void addPredecessor(final TopologyDescription.Node predecessor) 
{
             throw new UnsupportedOperationException("Sources don't have 
predecessors.");
@@ -1444,7 +1450,9 @@ public void addPredecessor(final TopologyDescription.Node 
predecessor) {
 
         @Override
         public String toString() {
-            return "Source: " + name + " (topics: " + topics + ")\n      --> " 
+ nodeNames(successors);
+            final String topicsString = topics == null ? 
topicPattern.toString() : topics.toString();
+            
+            return "Source: " + name + " (topics: " + topicsString + ")\n      
--> " + nodeNames(successors);
         }
 
         @Override
@@ -1459,13 +1467,14 @@ public boolean equals(final Object o) {
             final Source source = (Source) o;
             // omit successor to avoid infinite loops
             return name.equals(source.name)
-                && topics.equals(source.topics);
+                && topics.equals(source.topics)
+                && topicPattern.equals(source.topicPattern);
         }
 
         @Override
         public int hashCode() {
             // omit successor as it might change and alter the hash code
-            return Objects.hash(name, topics);
+            return Objects.hash(name, topics, topicPattern);
         }
     }
 
@@ -1528,10 +1537,20 @@ public Sink(final String name,
 
         @Override
         public String topic() {
-            if (topicNameExtractor instanceof StaticTopicNameExtractor)
+            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
                 return ((StaticTopicNameExtractor) 
topicNameExtractor).topicName;
-            else
+            } else {
                 return null;
+            }
+        }
+
+        @Override
+        public TopicNameExtractor topicNameExtractor() {
+            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+                return null;
+            } else {
+                return topicNameExtractor;
+            }
         }
 
         @Override
@@ -1541,7 +1560,10 @@ public void addSuccessor(final TopologyDescription.Node 
successor) {
 
         @Override
         public String toString() {
-            return "Sink: " + name + " (topic: " + topic() + ")\n      <-- " + 
nodeNames(predecessors);
+            if (topicNameExtractor instanceof StaticTopicNameExtractor) {
+                return "Sink: " + name + " (topic: " + topic() + ")\n      <-- 
" + nodeNames(predecessors);
+            }
+            return "Sink: " + name + " (extractor class: " + 
topicNameExtractor + ")\n      <-- " + nodeNames(predecessors);
         }
 
         @Override
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index ece157cd02e..eeb08ac1f95 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -27,6 +27,7 @@
 import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.RecordContext;
 import org.apache.kafka.streams.processor.TopicNameExtractor;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -370,6 +371,23 @@ public void shouldDescribeEmptyTopology() {
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
+    @Test
+    public void sinkShouldReturnNullTopicWithDynamicRouting() {
+        final TopologyDescription.Sink expectedSinkNode
+                = new InternalTopologyBuilder.Sink("sink", (key, value, 
record) -> record.topic() + "-" + key);
+
+        assertThat(expectedSinkNode.topic(), equalTo(null));
+    }
+
+    @Test
+    public void sinkShouldReturnTopicNameExtractorWithDynamicRouting() {
+        final TopicNameExtractor topicNameExtractor = (key, value, record) -> 
record.topic() + "-" + key;
+        final TopologyDescription.Sink expectedSinkNode
+                = new InternalTopologyBuilder.Sink("sink", topicNameExtractor);
+
+        assertThat(expectedSinkNode.topicNameExtractor(), 
equalTo(topicNameExtractor));
+    }
+
     @Test
     public void singleSourceShouldHaveSingleSubtopology() {
         final TopologyDescription.Source expectedSourceNode = 
addSource("source", "topic");
@@ -629,6 +647,34 @@ public void shouldDescribeMultipleGlobalStoreTopology() {
         assertThat(topology.describe(), equalTo(expectedDescription));
     }
 
+    @Test
+    public void topologyWithDynamicRoutingShouldDescribeExtractorClass() {
+        final StreamsBuilder builder  = new StreamsBuilder();
+
+        final TopicNameExtractor topicNameExtractor = new TopicNameExtractor() 
{
+            @Override
+            public String extract(final Object key, final Object value, final 
RecordContext recordContext) {
+                return recordContext.topic() + "-" + key;
+            }
+
+            @Override
+            public String toString() {
+                return "anonymous topic name extractor. topic is 
[recordContext.topic()]-[key]";
+            }
+        };
+        builder.stream("input-topic").to(topicNameExtractor);
+        final TopologyDescription describe = builder.build().describe();
+
+        assertEquals(
+                "Topologies:\n" +
+                "   Sub-topology: 0\n" +
+                "    Source: KSTREAM-SOURCE-0000000000 (topics: 
[input-topic])\n" +
+                "      --> KSTREAM-SINK-0000000001\n" +
+                "    Sink: KSTREAM-SINK-0000000001 (extractor class: anonymous 
topic name extractor. topic is [recordContext.topic()]-[key])\n" +
+                "      <-- KSTREAM-SOURCE-0000000000\n\n",
+                describe.toString());
+    }
+
     @Test
     public void kGroupedStreamZeroArgCountShouldPreserveTopologyStructure() {
         final StreamsBuilder builder = new StreamsBuilder();
@@ -1048,13 +1094,13 @@ public void 
kTableNamedMaterializedFilterShouldPreserveTopologyStructure() {
         for (int i = 1; i < sourceTopic.length; ++i) {
             allSourceTopics.append(", ").append(sourceTopic[i]);
         }
-        return new InternalTopologyBuilder.Source(sourceName, 
allSourceTopics.toString());
+        return new InternalTopologyBuilder.Source(sourceName, new 
HashSet<>(Arrays.asList(sourceTopic)), null);
     }
 
     private TopologyDescription.Source addSource(final String sourceName,
                                                  final Pattern sourcePattern) {
         topology.addSource(null, sourceName, null, null, null, sourcePattern);
-        return new InternalTopologyBuilder.Source(sourceName, 
sourcePattern.toString());
+        return new InternalTopologyBuilder.Source(sourceName, null, 
sourcePattern);
     }
 
     private TopologyDescription.Processor addProcessor(final String 
processorName,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
> ----------------------------------------------------------------
>
>                 Key: KAFKA-6966
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6966
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.0
>            Reporter: Matthias J. Sax
>            Assignee: Nishanth Pradeep
>            Priority: Major
>              Labels: beginner, kip, newbie
>
> With KIP-303, a dynamic routing feature was added and 
> `TopologyDescription.Sink#topic()` returns `null` if this feature is used.
> It would be useful to get the actually used `TopicNameExtractor` class from 
> the `TopologyDescription`.
> We suggest to add `Class<? extends TopicNameExtractor> 
> TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if 
> dynamic routing feature is not used.
> This is a public API change and requires a KIP: 
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to