This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bbd75b80cef KAFKA-15022: Detect negative cycle from one source (#14696)
bbd75b80cef is described below

commit bbd75b80cef2e9ec5413486228bc531725e2fc79
Author: Hao Li <[email protected]>
AuthorDate: Tue Nov 28 00:29:00 2023 -0800

    KAFKA-15022: Detect negative cycle from one source (#14696)
    
    Introduce a dummy node connected to every other node and run Bellman-ford 
from the dummy node once instead of from every node in the graph.
    
    Reviewers: Qichao Chu (@ex172000), Matthias J. Sax <[email protected]>
---
 gradle/spotbugs-exclude.xml                        |  6 ++
 .../processor/internals/assignment/Graph.java      | 66 ++++++++++++++++------
 .../processor/internals/assignment/GraphTest.java  |  7 +++
 3 files changed, 61 insertions(+), 18 deletions(-)

diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 9d2971a3ceb..e76d9dfc6ee 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -544,6 +544,12 @@ For a detailed description of spotbugs bug categories, see 
https://spotbugs.read
         <Bug pattern="NP_BOOLEAN_RETURN_NULL"/>
     </Match>
 
+    <Match>
+        <!-- False positive - null is allow in SortedMap -->
+        <Class 
name="org.apache.kafka.streams.processor.internals.assignment.Graph"/>
+        <Bug pattern="NP_NONNULL_PARAM_VIOLATION"/>
+    </Match>
+
     <Match>
         <!-- Suppress a warning about ignoring return value because this is 
intentional. -->
         <Class 
name="org.apache.kafka.common.config.AbstractConfig$ResolvingMap"/>
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
index 8bb31f44def..16635438e8c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/Graph.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -103,8 +104,10 @@ public class Graph<V extends Comparable<V>> {
         }
     }
 
-    private final SortedMap<V, SortedMap<V, Edge>> adjList = new TreeMap<>();
-    private final SortedSet<V> nodes = new TreeSet<>();
+    // Allow null as special internal node
+    private final SortedMap<V, SortedMap<V, Edge>> adjList = new 
TreeMap<>(Comparator.nullsFirst(Comparator.naturalOrder()));
+    // Allow null as special internal node
+    private final SortedSet<V> nodes = new 
TreeSet<>(Comparator.nullsFirst(Comparator.naturalOrder()));
     private final boolean isResidualGraph;
     private V sourceNode, sinkNode;
 
@@ -117,6 +120,8 @@ public class Graph<V extends Comparable<V>> {
     }
 
     public void addEdge(final V u, final V v, final int capacity, final int 
cost, final int flow) {
+        Objects.requireNonNull(u);
+        Objects.requireNonNull(v);
         addEdge(u, new Edge(v, capacity, cost, capacity - flow, flow));
     }
 
@@ -200,13 +205,40 @@ public class Graph<V extends Comparable<V>> {
         return residualGraph;
     }
 
+    private void addDummySourceNode(final Graph<V> residualGraph) {
+        if (!residualGraph.isResidualGraph) {
+            throw new IllegalStateException("Graph should be residual graph to 
add dummy source node");
+        }
+
+        // Add a dummy null node connected to every existing node with 
residual flow 1 and cost 0
+        // Then try to find negative cylce starting using dummy node as source 
node. Since there's no
+        // path from original nodes to null node, negative cycles must be 
within original nodes.
+        final TreeMap<V, Edge> destMap = new TreeMap<>();
+        for (final V node : residualGraph.nodes) {
+            final Edge edge = new Edge(node, 1, 0, 1, 0);
+            destMap.put(node, edge);
+        }
+        residualGraph.adjList.put(null, destMap);
+        residualGraph.nodes.add(null);
+    }
+
+    private void removeDummySourceNode(final Graph<V> residualGraph) {
+        if (!residualGraph.isResidualGraph) {
+            throw new IllegalStateException("Graph should be residual graph to 
remove dummy source node");
+        }
+        residualGraph.adjList.remove(null);
+        residualGraph.nodes.remove(null);
+    }
+
     /**
      * Solve min cost flow with cycle canceling algorithm.
      */
     public void solveMinCostFlow() {
         validateMinCostGraph();
         final Graph<V> residualGraph = residualGraph();
+        addDummySourceNode(residualGraph);
         residualGraph.cancelNegativeCycles();
+        removeDummySourceNode(residualGraph);
 
         for (final Entry<V, SortedMap<V, Edge>> nodeEdges : 
adjList.entrySet()) {
             final V node = nodeEdges.getKey();
@@ -290,25 +322,23 @@ public class Graph<V extends Comparable<V>> {
         boolean cyclePossible = true;
         while (cyclePossible) {
             cyclePossible = false;
-            for (final V node : nodes) {
-                final Map<V, V> parentNodes = new HashMap<>();
-                final Map<V, Edge> parentEdges = new HashMap<>();
-                final V possibleNodeInCycle = detectNegativeCycles(node, 
parentNodes, parentEdges);
-
-                if (possibleNodeInCycle == null) {
-                    continue;
-                }
+            final Map<V, V> parentNodes = new HashMap<>();
+            final Map<V, Edge> parentEdges = new HashMap<>();
+            final V possibleNodeInCycle = detectNegativeCycles(null, 
parentNodes, parentEdges);
 
-                final Set<V> visited = new HashSet<>();
-                V nodeInCycle = possibleNodeInCycle;
-                while (!visited.contains(nodeInCycle)) {
-                    visited.add(nodeInCycle);
-                    nodeInCycle = parentNodes.get(nodeInCycle);
-                }
+            if (possibleNodeInCycle == null) {
+                continue;
+            }
 
-                cyclePossible = true;
-                cancelNegativeCycle(nodeInCycle, parentNodes, parentEdges);
+            final Set<V> visited = new HashSet<>();
+            V nodeInCycle = possibleNodeInCycle;
+            while (!visited.contains(nodeInCycle)) {
+                visited.add(nodeInCycle);
+                nodeInCycle = parentNodes.get(nodeInCycle);
             }
+
+            cyclePossible = true;
+            cancelNegativeCycle(nodeInCycle, parentNodes, parentEdges);
         }
     }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java
index 7b5f2fb76ee..629c572dec0 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/GraphTest.java
@@ -219,6 +219,13 @@ public class GraphTest {
             exception.getMessage());
     }
 
+    @Test
+    public void testNullNode() {
+        final Graph<Integer> graph1 = new Graph<>();
+        assertThrows(NullPointerException.class, () -> graph1.addEdge(null, 1, 
1, 1, 1));
+        assertThrows(NullPointerException.class, () -> graph1.addEdge(1, null, 
1, 1, 1));
+    }
+
     @Test
     public void testJustSourceSink() {
         final Graph<Integer> graph1 = new Graph<>();

Reply via email to