This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bbd75b80cef KAFKA-15022: Detect negative cycle from one source (#14696)
bbd75b80cef is described below
commit bbd75b80cef2e9ec5413486228bc531725e2fc79
Author: Hao Li <[email protected]>
AuthorDate: Tue Nov 28 00:29:00 2023 -0800
KAFKA-15022: Detect negative cycle from one source (#14696)
Introduce a dummy node connected to every other node and run Bellman-ford
from the dummy node once instead of from every node in the graph.
Reviewers: Qichao Chu (@ex172000), Matthias J. Sax <[email protected]>
---
gradle/spotbugs-exclude.xml | 6 ++
.../processor/internals/assignment/Graph.java | 66 ++++++++++++++++------
.../processor/internals/assignment/GraphTest.java | 7 +++
3 files changed, 61 insertions(+), 18 deletions(-)
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 9d2971a3ceb..e76d9dfc6ee 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -544,6 +544,12 @@ For a detailed description of spotbugs bug categories, see
https://spotbugs.read
<Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
</Match>
+ <Match>
+ <!-- False positive - null is allow in SortedMap -->
+ <Class
name="org.apache.kafka.streams.processor.internals.assignment.Graph"/>
+ <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+ </Match>
+
<Match>
<!-- Suppress a warning about ignoring return value because this is
intentional. -->
<Class
name="org.apache.kafka.common.config.AbstractConfig$ResolvingMap"/>
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
index 8bb31f44def..16635438e8c 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals.assignment;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -103,8 +104,10 @@ public class Graph<V extends Comparable<V>> {
}
}
- private final SortedMap<V, SortedMap<V, Edge>> adjList = new TreeMap<>();
- private final SortedSet<V> nodes = new TreeSet<>();
+ // Allow null as special internal node
+ private final SortedMap<V, SortedMap<V, Edge>> adjList = new
TreeMap<>(Comparator.nullsFirst(Comparator.naturalOrder()));
+ // Allow null as special internal node
+ private final SortedSet<V> nodes = new
TreeSet<>(Comparator.nullsFirst(Comparator.naturalOrder()));
private final boolean isResidualGraph;
private V sourceNode, sinkNode;
@@ -117,6 +120,8 @@ public class Graph<V extends Comparable<V>> {
}
public void addEdge(final V u, final V v, final int capacity, final int
cost, final int flow) {
+ Objects.requireNonNull(u);
+ Objects.requireNonNull(v);
addEdge(u, new Edge(v, capacity, cost, capacity - flow, flow));
}
@@ -200,13 +205,40 @@ public class Graph<V extends Comparable<V>> {
return residualGraph;
}
+ private void addDummySourceNode(final Graph<V> residualGraph) {
+ if (!residualGraph.isResidualGraph) {
+ throw new IllegalStateException("Graph should be residual graph to
add dummy source node");
+ }
+
+ // Add a dummy null node connected to every existing node with
residual flow 1 and cost 0
+ // Then try to find negative cylce starting using dummy node as source
node. Since there's no
+ // path from original nodes to null node, negative cycles must be
within original nodes.
+ final TreeMap<V, Edge> destMap = new TreeMap<>();
+ for (final V node : residualGraph.nodes) {
+ final Edge edge = new Edge(node, 1, 0, 1, 0);
+ destMap.put(node, edge);
+ }
+ residualGraph.adjList.put(null, destMap);
+ residualGraph.nodes.add(null);
+ }
+
+ private void removeDummySourceNode(final Graph<V> residualGraph) {
+ if (!residualGraph.isResidualGraph) {
+ throw new IllegalStateException("Graph should be residual graph to
remove dummy source node");
+ }
+ residualGraph.adjList.remove(null);
+ residualGraph.nodes.remove(null);
+ }
+
/**
* Solve min cost flow with cycle canceling algorithm.
*/
public void solveMinCostFlow() {
validateMinCostGraph();
final Graph<V> residualGraph = residualGraph();
+ addDummySourceNode(residualGraph);
residualGraph.cancelNegativeCycles();
+ removeDummySourceNode(residualGraph);
for (final Entry<V, SortedMap<V, Edge>> nodeEdges :
adjList.entrySet()) {
final V node = nodeEdges.getKey();
@@ -290,25 +322,23 @@ public class Graph<V extends Comparable<V>> {
boolean cyclePossible = true;
while (cyclePossible) {
cyclePossible = false;
- for (final V node : nodes) {
- final Map<V, V> parentNodes = new HashMap<>();
- final Map<V, Edge> parentEdges = new HashMap<>();
- final V possibleNodeInCycle = detectNegativeCycles(node,
parentNodes, parentEdges);
-
- if (possibleNodeInCycle == null) {
- continue;
- }
+ final Map<V, V> parentNodes = new HashMap<>();
+ final Map<V, Edge> parentEdges = new HashMap<>();
+ final V possibleNodeInCycle = detectNegativeCycles(null,
parentNodes, parentEdges);
- final Set<V> visited = new HashSet<>();
- V nodeInCycle = possibleNodeInCycle;
- while (!visited.contains(nodeInCycle)) {
- visited.add(nodeInCycle);
- nodeInCycle = parentNodes.get(nodeInCycle);
- }
+ if (possibleNodeInCycle == null) {
+ continue;
+ }
- cyclePossible = true;
- cancelNegativeCycle(nodeInCycle, parentNodes, parentEdges);
+ final Set<V> visited = new HashSet<>();
+ V nodeInCycle = possibleNodeInCycle;
+ while (!visited.contains(nodeInCycle)) {
+ visited.add(nodeInCycle);
+ nodeInCycle = parentNodes.get(nodeInCycle);
}
+
+ cyclePossible = true;
+ cancelNegativeCycle(nodeInCycle, parentNodes, parentEdges);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java
index 7b5f2fb76ee..629c572dec0 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java
@@ -219,6 +219,13 @@ public class GraphTest {
exception.getMessage());
}
+ @Test
+ public void testNullNode() {
+ final Graph<Integer> graph1 = new Graph<>();
+ assertThrows(NullPointerException.class, () -> graph1.addEdge(null, 1,
1, 1, 1));
+ assertThrows(NullPointerException.class, () -> graph1.addEdge(1, null,
1, 1, 1));
+ }
+
@Test
public void testJustSourceSink() {
final Graph<Integer> graph1 = new Graph<>();