[ 
https://issues.apache.org/jira/browse/KAFKA-6328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328079#comment-16328079
 ] 

ASF GitHub Bot commented on KAFKA-6328:
---------------------------------------

guozhangwang closed pull request #4339: [KAFKA-6328] Exclude node groups 
belonging to global stores in InternalTopologyBuilder#makeNodeGroups
URL: https://github.com/apache/kafka/pull/4339
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 01af8bf5295..131e8d31cbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -76,6 +76,8 @@
          * @return the "global" processor node
          */
         Processor processor();
+
+        int id();
     }
 
     /**
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 90d46aa949f..09799f7ee7e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1286,13 +1286,6 @@ private boolean isGlobalSource(final String nodeName) {
     public TopologyDescription describe() {
         final TopologyDescription description = new TopologyDescription();
 
-        describeSubtopologies(description);
-        describeGlobalStores(description);
-
-        return description;
-    }
-
-    private void describeSubtopologies(final TopologyDescription description) {
         for (final Map.Entry<Integer, Set<String>> nodeGroup : 
makeNodeGroups().entrySet()) {
 
             final Set<String> allNodesOfGroups = nodeGroup.getValue();
@@ -1300,6 +1293,32 @@ private void describeSubtopologies(final 
TopologyDescription description) {
 
             if (!isNodeGroupOfGlobalStores) {
                 describeSubtopology(description, nodeGroup.getKey(), 
allNodesOfGroups);
+            } else {
+                describeGlobalStore(description, allNodesOfGroups, 
nodeGroup.getKey());
+            }
+        }
+
+        return description;
+    }
+
+    private void describeGlobalStore(final TopologyDescription description, 
final Set<String> nodes, int id) {
+        final Iterator<String> it = nodes.iterator();
+        while (it.hasNext()) {
+            final String node = it.next();
+
+            if (isGlobalSource(node)) {
+                // we found a GlobalStore node group; those contain exactly 
two node: {sourceNode,processorNode}
+                it.remove(); // remove sourceNode from group
+                final String processorNode = nodes.iterator().next(); // get 
remaining processorNode
+
+                description.addGlobalStore(new GlobalStore(
+                    node,
+                    processorNode,
+                    ((ProcessorNodeFactory) 
nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+                    nodeToSourceTopics.get(node).get(0),
+                    id
+                ));
+                break;
             }
         }
     }
@@ -1367,43 +1386,26 @@ private void describeSubtopology(final 
TopologyDescription description,
                 new HashSet<TopologyDescription.Node>(nodesByName.values())));
     }
 
-    private void describeGlobalStores(final TopologyDescription description) {
-        for (final Map.Entry<Integer, Set<String>> nodeGroup : 
makeNodeGroups().entrySet()) {
-            final Set<String> nodes = nodeGroup.getValue();
-
-            final Iterator<String> it = nodes.iterator();
-            while (it.hasNext()) {
-                final String node = it.next();
-
-                if (isGlobalSource(node)) {
-                    // we found a GlobalStore node group; those contain 
exactly two node: {sourceNode,processorNode}
-                    it.remove(); // remove sourceNode from group
-                    final String processorNode = nodes.iterator().next(); // 
get remaining processorNode
-
-                    description.addGlobalStore(new GlobalStore(
-                        node,
-                        processorNode,
-                        ((ProcessorNodeFactory) 
nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
-                        nodeToSourceTopics.get(node).get(0)
-                    ));
-                    break;
-                }
-            }
-        }
-    }
-
     public final static class GlobalStore implements 
TopologyDescription.GlobalStore {
         private final Source source;
         private final Processor processor;
+        private final int id;
 
         public GlobalStore(final String sourceName,
                            final String processorName,
                            final String storeName,
-                           final String topicName) {
+                           final String topicName,
+                           final int id) {
             source = new Source(sourceName, topicName);
             processor = new Processor(processorName, 
Collections.singleton(storeName));
             source.successors.add(processor);
             processor.predecessors.add(source);
+            this.id = id;
+        }
+
+        @Override
+        public int id() {
+            return id;
         }
 
         @Override
@@ -1418,7 +1420,7 @@ public GlobalStore(final String sourceName,
 
         @Override
         public String toString() {
-            return "GlobalStore: " + source.name + " (topic: " + source.topics 
+ ")\n      --> "
+            return "GlobalStore: " + source.name + " Id: " + id + " (topic: " 
+ source.topics + ")\n      --> "
                 + processor.name + " (store: " + 
processor.stores.iterator().next() + ")\n";
         }
 
@@ -1720,7 +1722,7 @@ public String toString() {
         @Override
         public int compare(final TopologyDescription.GlobalStore globalStore1,
                            final TopologyDescription.GlobalStore globalStore2) 
{
-            return 
globalStore1.source().name().compareTo(globalStore2.source().name());
+            return globalStore1.id() - globalStore2.id();
         }
     }
 
@@ -1737,8 +1739,8 @@ public int compare(final TopologyDescription.Subtopology 
subtopology1,
     private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new 
SubtopologyComparator();
 
     public final static class TopologyDescription implements 
org.apache.kafka.streams.TopologyDescription {
-        private final Set<TopologyDescription.Subtopology> subtopologies = new 
TreeSet<>(SUBTOPOLOGY_COMPARATOR);
-        private final Set<TopologyDescription.GlobalStore> globalStores = new 
TreeSet<>(GLOBALSTORE_COMPARATOR);
+        private final TreeSet<TopologyDescription.Subtopology> subtopologies = 
new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
+        private final TreeSet<TopologyDescription.GlobalStore> globalStores = 
new TreeSet<>(GLOBALSTORE_COMPARATOR);
 
         public void addSubtopology(final TopologyDescription.Subtopology 
subtopology) {
             subtopologies.add(subtopology);
@@ -1760,33 +1762,43 @@ public void addGlobalStore(final 
TopologyDescription.GlobalStore globalStore) {
 
         @Override
         public String toString() {
-            return subtopologiesAsString() + "\n" + globalStoresAsString();
-        }
-
-        private String subtopologiesAsString() {
             final StringBuilder sb = new StringBuilder();
-            sb.append("Sub-topologies:\n");
-            if (subtopologies.isEmpty()) {
-                sb.append("  none\n");
-            } else {
-                for (final TopologyDescription.Subtopology st : subtopologies) 
{
-                    sb.append("  ");
-                    sb.append(st);
+            sb.append("Topologies:\n ");
+            final TopologyDescription.Subtopology[] sortedSubtopologies = 
+                subtopologies.descendingSet().toArray(new 
TopologyDescription.Subtopology[subtopologies.size()]);
+            final TopologyDescription.GlobalStore[] sortedGlobalStores = 
+                globalStores.descendingSet().toArray(new 
TopologyDescription.GlobalStore[globalStores.size()]);
+            int expectedId = 0;
+            int subtopologiesIndex = sortedSubtopologies.length - 1;
+            int globalStoresIndex = sortedGlobalStores.length - 1;
+            while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
+                sb.append("  ");
+                final TopologyDescription.Subtopology subtopology = 
+                    sortedSubtopologies[subtopologiesIndex];
+                final TopologyDescription.GlobalStore globalStore = 
+                    sortedGlobalStores[globalStoresIndex];
+                if (subtopology.id() == expectedId) {
+                    sb.append(subtopology);
+                    subtopologiesIndex--;
+                } else {
+                    sb.append(globalStore);
+                    globalStoresIndex--;
                 }
+                expectedId++;
             }
-            return sb.toString();
-        }
-
-        private String globalStoresAsString() {
-            final StringBuilder sb = new StringBuilder();
-            sb.append("Global Stores:\n");
-            if (globalStores.isEmpty()) {
-                sb.append("  none\n");
-            } else {
-                for (final TopologyDescription.GlobalStore gs : globalStores) {
-                    sb.append("  ");
-                    sb.append(gs);
-                }
+            while (subtopologiesIndex != -1) {
+                final TopologyDescription.Subtopology subtopology = 
+                    sortedSubtopologies[subtopologiesIndex];
+                sb.append("  ");
+                sb.append(subtopology);
+                subtopologiesIndex--;
+            }
+            while (globalStoresIndex != -1) {
+                final TopologyDescription.GlobalStore globalStore = 
+                    sortedGlobalStores[globalStoresIndex];
+                sb.append("  ");
+                sb.append(globalStore);
+                globalStoresIndex--;
             }
             return sb.toString();
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 3ba780323b7..0a45803aed5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -579,14 +579,14 @@ public void 
processorsWithSharedStateShouldHaveSameSubtopology() {
 
     @Test
     public void shouldDescribeGlobalStoreTopology() {
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore", 
"source", "globalTopic", "processor");
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore", 
"source", "globalTopic", "processor", 0);
         assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
     }
 
     @Test
     public void shouldDescribeMultipleGlobalStoreTopology() {
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", 
"source1", "globalTopic1", "processor1");
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", 
"source2", "globalTopic2", "processor2");
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", 
"source1", "globalTopic1", "processor1", 0);
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", 
"source2", "globalTopic2", "processor2", 1);
         assertThat(topology.describe(), equalTo((TopologyDescription) 
expectedDescription));
     }
 
@@ -677,7 +677,8 @@ public void shouldDescribeMultipleGlobalStoreTopology() {
     private void addGlobalStoreToTopologyAndExpectedDescription(final String 
globalStoreName,
                                                                 final String 
sourceName,
                                                                 final String 
globalTopicName,
-                                                                final String 
processorName) {
+                                                                final String 
processorName,
+                                                                final int id) {
         final KeyValueStoreBuilder globalStoreBuilder = 
EasyMock.createNiceMock(KeyValueStoreBuilder.class);
         
EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
         EasyMock.replay(globalStoreBuilder);
@@ -695,7 +696,8 @@ private void 
addGlobalStoreToTopologyAndExpectedDescription(final String globalS
             sourceName,
             processorName,
             globalStoreName,
-            globalTopicName);
+            globalTopicName,
+            id);
 
         expectedDescription.addGlobalStore(expectedGlobalStore);
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Exclude node groups belonging to global stores in 
> InternalTopologyBuilder#makeNodeGroups
> ----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6328
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6328
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.0.0
>            Reporter: Guozhang Wang
>            Assignee: Richard Yu
>            Priority: Major
>              Labels: newbie
>         Attachments: kafka-6328.diff
>
>
> Today when we group processor nodes into groups (i.e. sub-topologies), we 
> assign the sub-topology id for global tables' dummy groups as well. As a 
> result, the subtopology ids (and hence task ids) are not consecutive anymore. 
> This is quite confusing for users trouble shooting and debugging; in 
> addition, the node group for global stores are not useful as well: we simply 
> exclude it in all the caller functions of makeNodeGroups.
> It would be better to simply exclude the global store's node groups in this 
> function so that the subtopology ids and task ids are consecutive.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to