This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch olap-algo
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit 05ce6ac3e1dbe843759249e3d979e0a12dfc3a88
Author: Jermy Li <[email protected]>
AuthorDate: Wed Jun 3 22:15:33 2020 +0800

    fix algorithm can't stop caused by threads exception (#18)
    
    Change-Id: I546682b19fb5a84a65dc2a3bd77d62b386722bfa
---
 .../hugegraph/job/algorithm/AbstractAlgorithm.java | 24 +++++-----
 .../baidu/hugegraph/job/algorithm/Consumers.java   | 36 +++++++++++----
 .../job/algorithm/comm/LouvainTraverser.java       | 51 +++++++++++++---------
 3 files changed, 71 insertions(+), 40 deletions(-)

diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
index d3311772a..327905ad3 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
@@ -354,18 +354,20 @@ public abstract class AbstractAlgorithm implements 
Algorithm {
             Consumers<Vertex> consumers = new Consumers<>(this.executor,
                                                           consumer, done);
             consumers.start();
-
-            long total = 0L;
-            while (vertices.hasNext()) {
-                this.updateProgress(++this.progress);
-                total++;
-                Vertex v = vertices.next();
-                consumers.provide(v);
+            try {
+                long total = 0L;
+                while (vertices.hasNext()) {
+                    this.updateProgress(++this.progress);
+                    total++;
+                    Vertex v = vertices.next();
+                    consumers.provide(v);
+                }
+                return total;
+            } catch (Throwable e) {
+                throw Consumers.wrapException(e);
+            } finally {
+                consumers.await();
             }
-
-            consumers.await();
-
-            return total;
         }
 
         protected Iterator<Vertex> vertices() {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
index 526419c46..f5d01d980 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
@@ -29,6 +29,7 @@ import java.util.function.Consumer;
 
 import org.slf4j.Logger;
 
+import com.baidu.hugegraph.HugeException;
 import com.baidu.hugegraph.util.ExecutorUtil;
 import com.baidu.hugegraph.util.Log;
 
@@ -50,6 +51,7 @@ public class Consumers<V> {
     private final BlockingQueue<V> queue;
 
     private volatile boolean ending = false;
+    private volatile Throwable exception = null;
 
     public Consumers(ExecutorService executor, Consumer<V> consumer) {
         this(executor, consumer, null);
@@ -72,6 +74,8 @@ public class Consumers<V> {
     }
 
     public void start() {
+        this.ending = false;
+        this.exception = null;
         if (this.executor == null) {
             return;
         }
@@ -81,11 +85,12 @@ public class Consumers<V> {
             this.executor.submit(() -> {
                 try {
                     this.run();
-                    if (this.done != null) {
-                        this.done.run();
-                    }
+                    this.done();
                 } catch (Throwable e) {
+                    // Only the first exception of one thread can be stored
+                    this.exception  = e;
                     LOG.error("Error when running task", e);
+                    this.done();
                 } finally {
                     this.latch.countDown();
                 }
@@ -120,10 +125,19 @@ public class Consumers<V> {
         return true;
     }
 
-    public void provide(V v) {
+    private void done() {
+        if (this.done != null) {
+            this.done.run();
+        }
+    }
+
+    public void provide(V v) throws Throwable {
         if (this.executor == null) {
+            assert this.exception == null;
             // do job directly if without thread pool
             this.consumer.accept(v);
+        } else if (this.exception != null) {
+            throw this.exception;
         } else {
             try {
                 this.queue.put(v);
@@ -137,14 +151,12 @@ public class Consumers<V> {
         this.ending = true;
         if (this.executor == null) {
             // call done() directly if without thread pool
-            if (this.done != null) {
-                this.done.run();
-            }
+            this.done();
         } else {
             try {
                 this.latch.await();
             } catch (InterruptedException e) {
-                LOG.warn("Interrupted", e);;
+                LOG.warn("Interrupted", e);
             }
         }
     }
@@ -163,4 +175,12 @@ public class Consumers<V> {
             return ExecutorUtil.newFixedThreadPool(workers, name);
         }
     }
+
+    public static RuntimeException wrapException(Throwable e) {
+        if (e instanceof RuntimeException) {
+            throw (RuntimeException) e;
+        }
+        throw new HugeException("Error when running task: %s",
+                                HugeException.rootCause(e).getMessage(), e);
+    }
 }
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
index e55152b10..5d7548aa3 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
@@ -421,21 +421,25 @@ public class LouvainTraverser extends AlgoTraverser {
                 moved.incrementAndGet();
             }
         });
-        consumers.start();
 
-        while (vertices.hasNext()) {
-            this.updateProgress(++this.progress);
-            Vertex v = vertices.next();
-            if (needSkipVertex(pass, v)) {
-                // skip the old intermediate data, or filter clabel
-                continue;
+        consumers.start();
+        try {
+            while (vertices.hasNext()) {
+                this.updateProgress(++this.progress);
+                Vertex v = vertices.next();
+                if (needSkipVertex(pass, v)) {
+                    // skip the old intermediate data, or filter clabel
+                    continue;
+                }
+                total++;
+                consumers.provide(v);
             }
-            total++;
-            consumers.provide(v);
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            consumers.await();
         }
 
-        consumers.await();
-
         // maybe always shocking when set degree limited
         return total == 0L ? 0d : moved.doubleValue() / total;
     }
@@ -455,19 +459,24 @@ public class LouvainTraverser extends AlgoTraverser {
             // commit when finished
             this.graph().tx().commit();
         });
-        consumers.start();
 
-        for (Pair<Community, Set<Id>> pair : comms) {
-            Community c = pair.getLeft();
-            if (c.empty()) {
-                continue;
+        consumers.start();
+        try {
+            for (Pair<Community, Set<Id>> pair : comms) {
+                Community c = pair.getLeft();
+                if (c.empty()) {
+                    continue;
+                }
+                this.progress += pair.getRight().size();
+                this.updateProgress(this.progress);
+                //this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
+                consumers.provide(pair);
             }
-            this.progress += pair.getRight().size();
-            this.updateProgress(this.progress);
-            //this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
-            consumers.provide(pair);
+        } catch (Throwable e) {
+            throw Consumers.wrapException(e);
+        } finally {
+            consumers.await();
         }
-        consumers.await();
 
         this.graph().tx().commit();
         assert this.allMembersExist(pass);

Reply via email to