[
https://issues.apache.org/jira/browse/KAFKA-6328?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16328079#comment-16328079
]
ASF GitHub Bot commented on KAFKA-6328:
---------------------------------------
guozhangwang closed pull request #4339: [KAFKA-6328] Exclude node groups
belonging to global stores in InternalTopologyBuilder#makeNodeGroups
URL: https://github.com/apache/kafka/pull/4339
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 01af8bf5295..131e8d31cbe 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -76,6 +76,8 @@
* @return the "global" processor node
*/
Processor processor();
+
+ int id();
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 90d46aa949f..09799f7ee7e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1286,13 +1286,6 @@ private boolean isGlobalSource(final String nodeName) {
public TopologyDescription describe() {
final TopologyDescription description = new TopologyDescription();
- describeSubtopologies(description);
- describeGlobalStores(description);
-
- return description;
- }
-
- private void describeSubtopologies(final TopologyDescription description) {
for (final Map.Entry<Integer, Set<String>> nodeGroup :
makeNodeGroups().entrySet()) {
final Set<String> allNodesOfGroups = nodeGroup.getValue();
@@ -1300,6 +1293,32 @@ private void describeSubtopologies(final
TopologyDescription description) {
if (!isNodeGroupOfGlobalStores) {
describeSubtopology(description, nodeGroup.getKey(),
allNodesOfGroups);
+ } else {
+ describeGlobalStore(description, allNodesOfGroups,
nodeGroup.getKey());
+ }
+ }
+
+ return description;
+ }
+
+ private void describeGlobalStore(final TopologyDescription description,
final Set<String> nodes, int id) {
+ final Iterator<String> it = nodes.iterator();
+ while (it.hasNext()) {
+ final String node = it.next();
+
+ if (isGlobalSource(node)) {
+ // we found a GlobalStore node group; those contain exactly
two node: {sourceNode,processorNode}
+ it.remove(); // remove sourceNode from group
+ final String processorNode = nodes.iterator().next(); // get
remaining processorNode
+
+ description.addGlobalStore(new GlobalStore(
+ node,
+ processorNode,
+ ((ProcessorNodeFactory)
nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+ nodeToSourceTopics.get(node).get(0),
+ id
+ ));
+ break;
}
}
}
@@ -1367,43 +1386,26 @@ private void describeSubtopology(final
TopologyDescription description,
new HashSet<TopologyDescription.Node>(nodesByName.values())));
}
- private void describeGlobalStores(final TopologyDescription description) {
- for (final Map.Entry<Integer, Set<String>> nodeGroup :
makeNodeGroups().entrySet()) {
- final Set<String> nodes = nodeGroup.getValue();
-
- final Iterator<String> it = nodes.iterator();
- while (it.hasNext()) {
- final String node = it.next();
-
- if (isGlobalSource(node)) {
- // we found a GlobalStore node group; those contain
exactly two node: {sourceNode,processorNode}
- it.remove(); // remove sourceNode from group
- final String processorNode = nodes.iterator().next(); //
get remaining processorNode
-
- description.addGlobalStore(new GlobalStore(
- node,
- processorNode,
- ((ProcessorNodeFactory)
nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
- nodeToSourceTopics.get(node).get(0)
- ));
- break;
- }
- }
- }
- }
-
public final static class GlobalStore implements
TopologyDescription.GlobalStore {
private final Source source;
private final Processor processor;
+ private final int id;
public GlobalStore(final String sourceName,
final String processorName,
final String storeName,
- final String topicName) {
+ final String topicName,
+ final int id) {
source = new Source(sourceName, topicName);
processor = new Processor(processorName,
Collections.singleton(storeName));
source.successors.add(processor);
processor.predecessors.add(source);
+ this.id = id;
+ }
+
+ @Override
+ public int id() {
+ return id;
}
@Override
@@ -1418,7 +1420,7 @@ public GlobalStore(final String sourceName,
@Override
public String toString() {
- return "GlobalStore: " + source.name + " (topic: " + source.topics
+ ")\n --> "
+ return "GlobalStore: " + source.name + " Id: " + id + " (topic: "
+ source.topics + ")\n --> "
+ processor.name + " (store: " +
processor.stores.iterator().next() + ")\n";
}
@@ -1720,7 +1722,7 @@ public String toString() {
@Override
public int compare(final TopologyDescription.GlobalStore globalStore1,
final TopologyDescription.GlobalStore globalStore2)
{
- return
globalStore1.source().name().compareTo(globalStore2.source().name());
+ return globalStore1.id() - globalStore2.id();
}
}
@@ -1737,8 +1739,8 @@ public int compare(final TopologyDescription.Subtopology
subtopology1,
private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new
SubtopologyComparator();
public final static class TopologyDescription implements
org.apache.kafka.streams.TopologyDescription {
- private final Set<TopologyDescription.Subtopology> subtopologies = new
TreeSet<>(SUBTOPOLOGY_COMPARATOR);
- private final Set<TopologyDescription.GlobalStore> globalStores = new
TreeSet<>(GLOBALSTORE_COMPARATOR);
+ private final TreeSet<TopologyDescription.Subtopology> subtopologies =
new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
+ private final TreeSet<TopologyDescription.GlobalStore> globalStores =
new TreeSet<>(GLOBALSTORE_COMPARATOR);
public void addSubtopology(final TopologyDescription.Subtopology
subtopology) {
subtopologies.add(subtopology);
@@ -1760,33 +1762,43 @@ public void addGlobalStore(final
TopologyDescription.GlobalStore globalStore) {
@Override
public String toString() {
- return subtopologiesAsString() + "\n" + globalStoresAsString();
- }
-
- private String subtopologiesAsString() {
final StringBuilder sb = new StringBuilder();
- sb.append("Sub-topologies:\n");
- if (subtopologies.isEmpty()) {
- sb.append(" none\n");
- } else {
- for (final TopologyDescription.Subtopology st : subtopologies)
{
- sb.append(" ");
- sb.append(st);
+ sb.append("Topologies:\n ");
+ final TopologyDescription.Subtopology[] sortedSubtopologies =
+ subtopologies.descendingSet().toArray(new
TopologyDescription.Subtopology[subtopologies.size()]);
+ final TopologyDescription.GlobalStore[] sortedGlobalStores =
+ globalStores.descendingSet().toArray(new
TopologyDescription.GlobalStore[globalStores.size()]);
+ int expectedId = 0;
+ int subtopologiesIndex = sortedSubtopologies.length - 1;
+ int globalStoresIndex = sortedGlobalStores.length - 1;
+ while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
+ sb.append(" ");
+ final TopologyDescription.Subtopology subtopology =
+ sortedSubtopologies[subtopologiesIndex];
+ final TopologyDescription.GlobalStore globalStore =
+ sortedGlobalStores[globalStoresIndex];
+ if (subtopology.id() == expectedId) {
+ sb.append(subtopology);
+ subtopologiesIndex--;
+ } else {
+ sb.append(globalStore);
+ globalStoresIndex--;
}
+ expectedId++;
}
- return sb.toString();
- }
-
- private String globalStoresAsString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("Global Stores:\n");
- if (globalStores.isEmpty()) {
- sb.append(" none\n");
- } else {
- for (final TopologyDescription.GlobalStore gs : globalStores) {
- sb.append(" ");
- sb.append(gs);
- }
+ while (subtopologiesIndex != -1) {
+ final TopologyDescription.Subtopology subtopology =
+ sortedSubtopologies[subtopologiesIndex];
+ sb.append(" ");
+ sb.append(subtopology);
+ subtopologiesIndex--;
+ }
+ while (globalStoresIndex != -1) {
+ final TopologyDescription.GlobalStore globalStore =
+ sortedGlobalStores[globalStoresIndex];
+ sb.append(" ");
+ sb.append(globalStore);
+ globalStoresIndex--;
}
return sb.toString();
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 3ba780323b7..0a45803aed5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -579,14 +579,14 @@ public void
processorsWithSharedStateShouldHaveSameSubtopology() {
@Test
public void shouldDescribeGlobalStoreTopology() {
- addGlobalStoreToTopologyAndExpectedDescription("globalStore",
"source", "globalTopic", "processor");
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore",
"source", "globalTopic", "processor", 0);
assertThat(topology.describe(), equalTo((TopologyDescription)
expectedDescription));
}
@Test
public void shouldDescribeMultipleGlobalStoreTopology() {
- addGlobalStoreToTopologyAndExpectedDescription("globalStore1",
"source1", "globalTopic1", "processor1");
- addGlobalStoreToTopologyAndExpectedDescription("globalStore2",
"source2", "globalTopic2", "processor2");
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore1",
"source1", "globalTopic1", "processor1", 0);
+ addGlobalStoreToTopologyAndExpectedDescription("globalStore2",
"source2", "globalTopic2", "processor2", 1);
assertThat(topology.describe(), equalTo((TopologyDescription)
expectedDescription));
}
@@ -677,7 +677,8 @@ public void shouldDescribeMultipleGlobalStoreTopology() {
private void addGlobalStoreToTopologyAndExpectedDescription(final String
globalStoreName,
final String
sourceName,
final String
globalTopicName,
- final String
processorName) {
+ final String
processorName,
+ final int id) {
final KeyValueStoreBuilder globalStoreBuilder =
EasyMock.createNiceMock(KeyValueStoreBuilder.class);
EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
EasyMock.replay(globalStoreBuilder);
@@ -695,7 +696,8 @@ private void
addGlobalStoreToTopologyAndExpectedDescription(final String globalS
sourceName,
processorName,
globalStoreName,
- globalTopicName);
+ globalTopicName,
+ id);
expectedDescription.addGlobalStore(expectedGlobalStore);
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Exclude node groups belonging to global stores in
> InternalTopologyBuilder#makeNodeGroups
> ----------------------------------------------------------------------------------------
>
> Key: KAFKA-6328
> URL: https://issues.apache.org/jira/browse/KAFKA-6328
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 1.0.0
> Reporter: Guozhang Wang
> Assignee: Richard Yu
> Priority: Major
> Labels: newbie
> Attachments: kafka-6328.diff
>
>
> Today when we group processor nodes into groups (i.e. sub-topologies), we
> assign the sub-topology id for global tables' dummy groups as well. As a
> result, the subtopology ids (and hence task ids) are not consecutive anymore.
> This is quite confusing for users trouble shooting and debugging; in
> addition, the node group for global stores are not useful as well: we simply
> exclude it in all the caller functions of makeNodeGroups.
> It would be better to simply exclude the global store's node groups in this
> function so that the subtopology ids and task ids are consecutive.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)