This is an automated email from the ASF dual-hosted git repository.

jin pushed a commit to branch olap-algo
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit a11acdffc79fba5a723932b2b6af7facbd8a4354
Author: Jermy Li <[email protected]>
AuthorDate: Tue May 12 20:59:35 2020 +0800

    support parallel: Louvain,LPA,Rings,K-Core,Fusiform (#15)
    
    * optimize louvain by multi threads
    * implement louvain threads
    * fix race condition
    * implement merge community by multi threads
    * remove debug info
    * fix genId race condition
    * compatible with serial and parallel computing
    * support parallel lpa
    * support parallel: Louvain,LPA,Rings,K-Core,Fusiform
    
    Change-Id: I2425d1da58581ea7a61dce72a88355ae3d2dd610
---
 .../hugegraph/job/algorithm/AbstractAlgorithm.java |  53 +++-
 .../baidu/hugegraph/job/algorithm/Consumers.java   | 161 +++++++++++
 .../job/algorithm/CountEdgeAlgorithm.java          |   5 +-
 .../job/algorithm/CountVertexAlgorithm.java        |   5 +-
 .../cent/BetweenessCentralityAlgorithm.java        |  21 +-
 .../cent/ClosenessCentralityAlgorithm.java         |  21 +-
 .../algorithm/cent/DegreeCentralityAlgorithm.java  |   9 +-
 .../cent/EigenvectorCentralityAlgorithm.java       |  21 +-
 .../algorithm/comm/ClusterCoeffcientAlgorithm.java |   7 +-
 .../job/algorithm/comm/KCoreAlgorithm.java         |  55 ++--
 .../job/algorithm/comm/LouvainAlgorithm.java       |   9 +-
 .../job/algorithm/comm/LouvainTraverser.java       | 311 +++++++++++++--------
 .../hugegraph/job/algorithm/comm/LpaAlgorithm.java |  50 ++--
 .../job/algorithm/comm/TriangleCountAlgorithm.java |   7 +-
 .../job/algorithm/path/RingsDetectAlgorithm.java   |  50 ++--
 .../similarity/FusiformSimilarityAlgorithm.java    |  59 ++--
 16 files changed, 585 insertions(+), 259 deletions(-)

diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
index 969bda1d8..c36a70405 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/AbstractAlgorithm.java
@@ -26,6 +26,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
 
 import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.mutable.MutableLong;
@@ -87,6 +89,7 @@ public abstract class AbstractAlgorithm implements Algorithm {
     public static final String KEY_CAPACITY = "capacity";
     public static final String KEY_LIMIT = "limit";
     public static final String KEY_ALPHA = "alpha";
+    public static final String KEY_WORKERS = "workers";
 
     public static final long DEFAULT_CAPACITY = 10000000L;
     public static final long DEFAULT_LIMIT = 100L;
@@ -213,6 +216,15 @@ public abstract class AbstractAlgorithm implements 
Algorithm {
         return parameterString(parameters, KEY_SOURCE_CLABEL);
     }
 
+    protected static int workers(Map<String, Object> parameters) {
+        if (!parameters.containsKey(KEY_WORKERS)) {
+            return -1;
+        }
+        int workers = parameterInt(parameters, KEY_WORKERS);
+        HugeTraverser.checkNonNegativeOrNoLimit(workers, KEY_WORKERS);
+        return workers;
+    }
+
     public static Object parameter(Map<String, Object> parameters, String key) 
{
         Object value = parameters.get(key);
         E.checkArgument(value != null,
@@ -280,20 +292,59 @@ public abstract class AbstractAlgorithm implements 
Algorithm {
         }
     }
 
-    public static class AlgoTraverser extends HugeTraverser {
+    public static class AlgoTraverser extends HugeTraverser
+                                      implements AutoCloseable {
 
         private final Job<Object> job;
+        protected final ExecutorService executor;
         protected long progress;
 
         public AlgoTraverser(Job<Object> job) {
             super(job.graph());
             this.job = job;
+            this.executor = null;
+        }
+
+        protected AlgoTraverser(Job<Object> job, String name, int workers) {
+            super(job.graph());
+            this.job = job;
+            String prefix = name + "-" + job.task().id();
+            this.executor = Consumers.newThreadPool(prefix, workers);
         }
 
         public void updateProgress(long progress) {
             this.job.updateProgress((int) progress);
         }
 
+        @Override
+        public void close() {
+            if (this.executor != null) {
+                this.executor.shutdown();
+            }
+        }
+
+        protected long traverse(String sourceLabel, String sourceCLabel,
+                                Consumer<Vertex> consumer) {
+            Iterator<Vertex> vertices = this.vertices(sourceLabel, sourceLabel,
+                                                      Query.NO_LIMIT);
+
+            Consumers<Vertex> consumers = new Consumers<>(this.executor,
+                                                          consumer);
+            consumers.start();
+
+            long total = 0L;
+            while (vertices.hasNext()) {
+                this.updateProgress(++this.progress);
+                total++;
+                Vertex v = vertices.next();
+                consumers.provide(v);
+            }
+
+            consumers.await();
+
+            return total;
+        }
+
         protected Iterator<Vertex> vertices() {
             return this.vertices(Query.NO_LIMIT);
         }
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
new file mode 100644
index 000000000..795e0d712
--- /dev/null
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/Consumers.java
@@ -0,0 +1,161 @@
+/*
+ * Copyright 2017 HugeGraph Authors
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
+ * under the License.
+ */
+
+package com.baidu.hugegraph.job.algorithm;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+
+import com.baidu.hugegraph.util.ExecutorUtil;
+import com.baidu.hugegraph.util.Log;
+
+public class Consumers<V> {
+
+    public static final int CPUS = Runtime.getRuntime().availableProcessors();
+    public static final int THREADS = 4 + CPUS / 4;
+    public static final int QUEUE_WORKER_SIZE = 1000;
+
+    private static final Logger LOG = Log.logger(Consumers.class);
+
+    private final ExecutorService executor;
+    private final Consumer<V> consumer;
+    private final Runnable done;
+
+    private final int workers;
+    private final int queueSize;
+    private final CountDownLatch latch;
+    private final BlockingQueue<V> queue;
+
+    private volatile boolean ending = false;
+
+    public Consumers(ExecutorService executor, Consumer<V> consumer) {
+        this(executor, consumer, null);
+    }
+
+    public Consumers(ExecutorService executor,
+                     Consumer<V> consumer, Runnable done) {
+        this.executor = executor;
+        this.consumer = consumer;
+        this.done = done;
+
+        int workers = THREADS;
+        if (this.executor instanceof ThreadPoolExecutor) {
+            workers = ((ThreadPoolExecutor) this.executor).getCorePoolSize();
+        }
+        this.workers = workers;
+        this.queueSize = QUEUE_WORKER_SIZE * workers;
+        this.latch = new CountDownLatch(workers);
+        this.queue = new ArrayBlockingQueue<>(this.queueSize);
+    }
+
+    public void start() {
+        if (this.executor == null) {
+            return;
+        }
+        LOG.info("Starting {} workers with queue size {}...",
+                 this.workers, this.queueSize);
+        for (int i = 0; i < this.workers; i++) {
+            this.executor.submit(() -> {
+                try {
+                    this.run();
+                    if (this.done != null) {
+                        this.done.run();
+                    }
+                } catch (Throwable e) {
+                    LOG.error("Error when running task", e);
+                } finally {
+                    this.latch.countDown();
+                }
+            });
+        }
+    }
+
+    private void run() {
+        LOG.debug("Start to work...");
+        while (!this.ending) {
+            this.consume();
+        }
+        assert this.ending;
+        while (this.consume());
+
+        LOG.debug("Worker finished");
+    }
+
+    private boolean consume() {
+        V elem;
+        try {
+            elem = this.queue.poll(1, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            // ignore
+            return true;
+        }
+        if (elem == null) {
+            return false;
+        }
+        // do job
+        this.consumer.accept(elem);
+        return true;
+    }
+
+    public void provide(V v) {
+        if (this.executor == null) {
+            // do job directly
+            this.consumer.accept(v);
+        } else {
+            try {
+                this.queue.put(v);
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted", e);;
+            }
+        }
+    }
+
+    public void await() {
+        this.ending = true;
+        if (this.executor != null) {
+            try {
+                this.latch.await();
+            } catch (InterruptedException e) {
+                LOG.warn("Interrupted", e);;
+            }
+        }
+    }
+
+    public static ExecutorService newThreadPool(String prefix, int workers) {
+        if (workers == 0) {
+            return null;
+        } else {
+            if (workers < 0) {
+                assert workers == -1;
+                workers = Consumers.THREADS;
+            } else if (workers > Consumers.CPUS * 2) {
+                workers = Consumers.CPUS * 2;
+            }
+            String name = prefix + "-worker-%d";
+            return ExecutorUtil.newFixedThreadPool(workers, name);
+        }
+    }
+}
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java
index 9fb122348..670f54471 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountEdgeAlgorithm.java
@@ -43,8 +43,9 @@ public class CountEdgeAlgorithm extends AbstractAlgorithm {
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.count();
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.count();
+        }
     }
 
     private static class Traverser extends AlgoTraverser {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java
index 582e0bb69..68a59a363 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/CountVertexAlgorithm.java
@@ -43,8 +43,9 @@ public class CountVertexAlgorithm extends AbstractAlgorithm {
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.count();
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.count();
+        }
     }
 
     private static class Traverser extends AlgoTraverser {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
index 12e3acba0..4f3415a15 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/BetweenessCentralityAlgorithm.java
@@ -42,16 +42,17 @@ public class BetweenessCentralityAlgorithm extends 
AbstractCentAlgorithm {
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.betweenessCentrality(direction(parameters),
-                                              edgeLabel(parameters),
-                                              depth(parameters),
-                                              degree(parameters),
-                                              sample(parameters),
-                                              sourceLabel(parameters),
-                                              sourceSample(parameters),
-                                              sourceCLabel(parameters),
-                                              top(parameters));
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.betweenessCentrality(direction(parameters),
+                                                  edgeLabel(parameters),
+                                                  depth(parameters),
+                                                  degree(parameters),
+                                                  sample(parameters),
+                                                  sourceLabel(parameters),
+                                                  sourceSample(parameters),
+                                                  sourceCLabel(parameters),
+                                                  top(parameters));
+        }
     }
 
     private static class Traverser extends AbstractCentAlgorithm.Traverser {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
index cb64bd8bc..6719eee1e 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/ClosenessCentralityAlgorithm.java
@@ -51,16 +51,17 @@ public class ClosenessCentralityAlgorithm extends 
AbstractCentAlgorithm {
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.closenessCentrality(direction(parameters),
-                                             edgeLabel(parameters),
-                                             depth(parameters),
-                                             degree(parameters),
-                                             sample(parameters),
-                                             sourceLabel(parameters),
-                                             sourceSample(parameters),
-                                             sourceCLabel(parameters),
-                                             top(parameters));
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.closenessCentrality(direction(parameters),
+                                                 edgeLabel(parameters),
+                                                 depth(parameters),
+                                                 degree(parameters),
+                                                 sample(parameters),
+                                                 sourceLabel(parameters),
+                                                 sourceSample(parameters),
+                                                 sourceCLabel(parameters),
+                                                 top(parameters));
+        }
     }
 
     private static class Traverser extends AbstractCentAlgorithm.Traverser {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
index a19c09822..f29a6301d 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/DegreeCentralityAlgorithm.java
@@ -47,10 +47,11 @@ public class DegreeCentralityAlgorithm extends 
AbstractCentAlgorithm {
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.degreeCentrality(direction(parameters),
-                                          edgeLabel(parameters),
-                                          top(parameters));
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.degreeCentrality(direction(parameters),
+                                              edgeLabel(parameters),
+                                              top(parameters));
+        }
     }
 
     private static class Traverser extends AlgoTraverser {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
index ce47417c4..39cec64cd 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/cent/EigenvectorCentralityAlgorithm.java
@@ -44,16 +44,17 @@ public class EigenvectorCentralityAlgorithm extends 
AbstractCentAlgorithm {
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.eigenvectorCentrality(direction(parameters),
-                                               edgeLabel(parameters),
-                                               depth(parameters),
-                                               degree(parameters),
-                                               sample(parameters),
-                                               sourceLabel(parameters),
-                                               sourceSample(parameters),
-                                               sourceCLabel(parameters),
-                                               top(parameters));
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.eigenvectorCentrality(direction(parameters),
+                                                   edgeLabel(parameters),
+                                                   depth(parameters),
+                                                   degree(parameters),
+                                                   sample(parameters),
+                                                   sourceLabel(parameters),
+                                                   sourceSample(parameters),
+                                                   sourceCLabel(parameters),
+                                                   top(parameters));
+        }
     }
 
     private static class Traverser extends AbstractCentAlgorithm.Traverser {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
index cc893fc1f..52f0b07a7 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/ClusterCoeffcientAlgorithm.java
@@ -40,9 +40,10 @@ public class ClusterCoeffcientAlgorithm extends 
AbstractCommAlgorithm {
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.clusterCoeffcient(direction(parameters),
-                                           degree(parameters));
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.clusterCoeffcient(direction(parameters),
+                                               degree(parameters));
+        }
     }
 
     private static class Traverser extends TriangleCountAlgorithm.Traverser {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
index 4cc6a88ba..6a721258a 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/KCoreAlgorithm.java
@@ -33,7 +33,6 @@ import 
org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import com.baidu.hugegraph.HugeGraph;
 import com.baidu.hugegraph.backend.id.Id;
-import com.baidu.hugegraph.backend.query.Query;
 import com.baidu.hugegraph.job.Job;
 import com.baidu.hugegraph.schema.EdgeLabel;
 import com.baidu.hugegraph.traversal.algorithm.FusiformSimilarityTraverser;
@@ -65,19 +64,22 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm {
         sourceCLabel(parameters);
         direction(parameters);
         edgeLabel(parameters);
+        workers(parameters);
     }
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.kcore(sourceLabel(parameters),
-                               sourceCLabel(parameters),
-                               direction(parameters),
-                               edgeLabel(parameters),
-                               k(parameters),
-                               alpha(parameters),
-                               degree(parameters),
-                               merged(parameters));
+        int workers = workers(parameters);
+        try (Traverser traverser = new Traverser(job, workers)) {
+            return traverser.kcore(sourceLabel(parameters),
+                                   sourceCLabel(parameters),
+                                   direction(parameters),
+                                   edgeLabel(parameters),
+                                   k(parameters),
+                                   alpha(parameters),
+                                   degree(parameters),
+                                   merged(parameters));
+        }
     }
 
     protected static int k(Map<String, Object> parameters) {
@@ -98,16 +100,14 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm {
 
     public static class Traverser extends AlgoTraverser {
 
-        public Traverser(Job<Object> job) {
-            super(job);
+        public Traverser(Job<Object> job, int workers) {
+            super(job, "kcore", workers);
         }
 
         public Object kcore(String sourceLabel, String sourceCLabel,
                             Directions dir, String label, int k, double alpha,
                             long degree, boolean merged) {
             HugeGraph graph = this.graph();
-            Iterator<Vertex> vertices = this.vertices(sourceLabel, 
sourceCLabel,
-                                                      Query.NO_LIMIT);
             EdgeLabel edgeLabel = label == null ? null : 
graph.edgeLabel(label);
 
             KcoreTraverser traverser = new KcoreTraverser(graph);
@@ -115,27 +115,34 @@ public class KCoreAlgorithm extends AbstractCommAlgorithm 
{
             kcoresJson.startObject();
             kcoresJson.appendKey("kcores");
             kcoresJson.startList();
-            Set<Set<Id>> kcoreSet = new HashSet<>();
-            while(vertices.hasNext()) {
-                this.updateProgress(++this.progress);
-                Vertex vertex = vertices.next();
-                Set<Id> kcore = traverser.kcore(IteratorUtils.of(vertex),
+
+            Set<Set<Id>> kcores = new HashSet<>();
+
+            this.traverse(sourceLabel, sourceCLabel, v -> {
+                Set<Id> kcore = traverser.kcore(IteratorUtils.of(v),
                                                 dir, edgeLabel, k, alpha,
                                                 degree);
                 if (kcore.isEmpty()) {
-                    continue;
+                    return;
                 }
                 if (merged) {
-                    mergeKcores(kcoreSet, kcore);
+                    synchronized (kcores) {
+                        mergeKcores(kcores, kcore);
+                    }
                 } else {
-                    kcoresJson.appendRaw(JsonUtil.toJson(kcore));
+                    String json = JsonUtil.toJson(kcore);
+                    synchronized (kcoresJson) {
+                        kcoresJson.appendRaw(json);
+                    }
                 }
-            }
+            });
+
             if (merged) {
-                for (Set<Id> kcore : kcoreSet) {
+                for (Set<Id> kcore : kcores) {
                     kcoresJson.appendRaw(JsonUtil.toJson(kcore));
                 }
             }
+
             kcoresJson.endList();
             kcoresJson.endObject();
 
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
index c0c05f9a2..446ab2686 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainAlgorithm.java
@@ -42,6 +42,7 @@ public class LouvainAlgorithm extends AbstractCommAlgorithm {
         showModularity(parameters);
         showCommunity(parameters);
         clearPass(parameters);
+        workers(parameters);
     }
 
     @Override
@@ -49,13 +50,15 @@ public class LouvainAlgorithm extends AbstractCommAlgorithm 
{
         String label = sourceLabel(parameters);
         String clabel = sourceCLabel(parameters);
         long degree = degree(parameters);
+        int workers = workers(parameters);
 
-        LouvainTraverser traverser = new LouvainTraverser(job, degree,
-                                                          label, clabel);
         Long clearPass = clearPass(parameters);
         Long modPass = showModularity(parameters);
         String showComm = showCommunity(parameters);
-        try {
+
+        try (LouvainTraverser traverser = new LouvainTraverser(
+                                          job, workers, degree,
+                                          label, clabel)) {
             if (clearPass != null) {
                 return traverser.clearPass(clearPass.intValue());
             } else if (modPass != null) {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
index a63a1259d..e55152b10 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LouvainTraverser.java
@@ -29,8 +29,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
+import java.util.Objects;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.commons.lang.mutable.MutableFloat;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversal;
@@ -47,6 +51,7 @@ import com.baidu.hugegraph.iterator.ListIterator;
 import com.baidu.hugegraph.job.Job;
 import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
 import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm.AlgoTraverser;
+import com.baidu.hugegraph.job.algorithm.Consumers;
 import com.baidu.hugegraph.schema.SchemaLabel;
 import com.baidu.hugegraph.schema.SchemaManager;
 import com.baidu.hugegraph.schema.VertexLabel;
@@ -78,9 +83,9 @@ public class LouvainTraverser extends AlgoTraverser {
     private long m;
     private String passLabel;
 
-    public LouvainTraverser(Job<Object> job,  long degree,
+    public LouvainTraverser(Job<Object> job, int workers, long degree,
                             String sourceLabel, String sourceCLabel) {
-        super(job);
+        super(job, "louvain", workers);
         this.g = this.graph().traversal();
         this.sourceLabel = sourceLabel;
         this.sourceCLabel = sourceCLabel;
@@ -197,13 +202,13 @@ public class LouvainTraverser extends AlgoTraverser {
 
     private void insertNewCommunity(int pass, Id cid, float cweight,
                                     int kin, List<String> members,
-                                    Map<Id, MutableInt> cedges) {
+                                    Map<Id, MutableFloat> cedges) {
         // create backend vertex if it's the first time
         Id vid = this.cache.genId(pass, cid);
         Vertex node = this.newCommunityNode(vid, cweight, kin, members);
         commitIfNeeded();
         // update backend vertex edges
-        for (Map.Entry<Id, MutableInt> e : cedges.entrySet()) {
+        for (Map.Entry<Id, MutableFloat> e : cedges.entrySet()) {
             float weight = e.getValue().floatValue();
             vid = this.cache.genId(pass, e.getKey());
             Vertex targetV = this.makeCommunityNode(vid);
@@ -280,7 +285,7 @@ public class LouvainTraverser extends AlgoTraverser {
         return 1f;
     }
 
-    private Id cidOfVertex(Vertex v, List<Edge> nbs) {
+    private Community communityOfVertex(Vertex v, List<Edge> nbs) {
         Id vid = (Id) v.id();
         Community c = this.cache.vertex2Community(vid);
         // ensure source vertex exist in cache
@@ -288,7 +293,7 @@ public class LouvainTraverser extends AlgoTraverser {
             c = this.wrapCommunity(v, nbs);
             assert c != null;
         }
-        return c != null ? c.cid : vid;
+        return c;
     }
 
     // 1: wrap original vertex as community node
@@ -305,7 +310,7 @@ public class LouvainTraverser extends AlgoTraverser {
 
         comm = new Community(vid);
         comm.add(this, v, nbs);
-        this.cache.vertex2Community(vid, comm);
+        comm = this.cache.vertex2CommunityIfAbsent(vid, comm);
         return comm;
     }
 
@@ -331,31 +336,93 @@ public class LouvainTraverser extends AlgoTraverser {
         return comms.values();
     }
 
-    private void moveCommunity(Vertex v, List<Edge> nbs, Community newC) {
+    private void doMoveCommunity(Vertex v, List<Edge> nbs, Community newC) {
         Id vid = (Id) v.id();
 
-        // remove v from old community
-        Community oldC = this.cache.vertex2Community(vid);
+        // update community of v (return the origin one)
+        Community oldC = this.cache.vertex2Community(vid, newC);
+
+        // remove v from old community. should synchronized (vid)?
         if (oldC != null) {
             oldC.remove(this, v, nbs);
         }
 
         // add v to new community
         newC.add(this, v, nbs);
-        LOG.debug("Move {} to comm: {}", v, newC);
-
-        // update community of v
-        this.cache.vertex2Community(vid, newC);
+        LOG.debug("Move {} to community: {}", v, newC);
+    }
+
+    private boolean moveCommunity(Vertex v, int pass) {
+        // move vertex to neighbor community if needed
+        List<Edge> nbs = neighbors((Id) v.id());
+        Community c = communityOfVertex(v, nbs);
+        double ki = kinOfVertex(v) + weightOfVertex(v, nbs);
+        // update community of v if △Q changed
+        double maxDeltaQ = 0d;
+        Community bestComm = null;
+        // list all neighbor communities of v
+        for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) {
+            // △Q = (Ki_in - Ki * Etot / m) / 2m
+            Community otherC = nbc.getLeft();
+            if (otherC.size() >= MAX_COMM_SIZE) {
+                LOG.info("Skip community {} for {} due to its size >= {}",
+                         otherC.cid, v, MAX_COMM_SIZE);
+                continue;
+            }
+            // weight between c and otherC
+            double kiin = nbc.getRight().floatValue();
+            // weight of otherC
+            double tot = otherC.kin() + otherC.kout();
+            if (c.equals(otherC)) {
+                assert c == otherC;
+                if (tot < ki) {
+                    /*
+                     * expect tot >= ki, but multi-threads may
+                     * cause tot < ki due to concurrent update otherC
+                     */
+                    LOG.warn("Changing vertex: {}(ki={}, kiin={}, pass={}), 
otherC: {}",
+                             v, ki, kiin, pass, otherC);
+                }
+                tot -= ki;
+                // assert tot >= 0d : otherC + ", tot=" + tot + ", ki=" + ki;
+                // expect tot >= 0, but may be something wrong?
+                if (tot < 0d) {
+                    tot = 0d;
+                }
+            }
+            double deltaQ = kiin - ki * tot / this.m;
+            if (deltaQ > maxDeltaQ) {
+                // TODO: cache otherC for neighbors the same community
+                maxDeltaQ = deltaQ;
+                bestComm = otherC;
+            }
+        }
+        if (maxDeltaQ > 0d && !c.equals(bestComm)) {
+            // move v to the community of maxQ neighbor
+            doMoveCommunity(v, nbs, bestComm);
+            return true;
+        }
+        return false;
     }
 
     private double moveCommunities(int pass) {
+        LOG.info("Detect community for pass {}", pass);
         Iterator<Vertex> vertices = this.sourceVertices(pass);
 
         // shuffle
         //r = r.order().by(shuffle);
 
         long total = 0L;
-        long moved = 0L;
+        AtomicLong moved = new AtomicLong(0L);
+
+        Consumers<Vertex> consumers = new Consumers<>(this.executor, v -> {
+            // called by multi-threads
+            if (this.moveCommunity(v, pass)) {
+                moved.incrementAndGet();
+            }
+        });
+        consumers.start();
+
         while (vertices.hasNext()) {
             this.updateProgress(++this.progress);
             Vertex v = vertices.next();
@@ -364,106 +431,93 @@ public class LouvainTraverser extends AlgoTraverser {
                 continue;
             }
             total++;
-            List<Edge> nbs = neighbors((Id) v.id());
-            Id cid = cidOfVertex(v, nbs);
-            double ki = kinOfVertex(v) + weightOfVertex(v, nbs);
-            // update community of v if △Q changed
-            double maxDeltaQ = 0d;
-            Community bestComm = null;
-            // list all neighbor communities of v
-            for (Pair<Community, MutableInt> nbc : nbCommunities(pass, nbs)) {
-                // △Q = (Ki_in - Ki * Etot / m) / 2m
-                Community otherC = nbc.getLeft();
-                if (otherC.size() >= MAX_COMM_SIZE) {
-                    LOG.info("Skip community {} for {} due to its size >= {}",
-                             otherC.cid, v, MAX_COMM_SIZE);
-                    continue;
-                }
-                // weight between c and otherC
-                double kiin = nbc.getRight().floatValue();
-                // weight of otherC
-                double tot = otherC.kin() + otherC.kout();
-                if (cid.equals(otherC.cid)) {
-                    tot -= ki;
-                    assert tot >= 0d;
-                    // expect tot >= 0, but may be something wrong?
-                    if (tot < 0d) {
-                        tot = 0d;
-                    }
-                }
-                double deltaQ = kiin - ki * tot / this.m;
-                if (deltaQ > maxDeltaQ) {
-                    // TODO: cache otherC for neighbors the same community
-                    maxDeltaQ = deltaQ;
-                    bestComm = otherC;
-                }
-            }
-            if (maxDeltaQ > 0d && !cid.equals(bestComm.cid)) {
-                moved++;
-                // move v to the community of maxQ neighbor
-                moveCommunity(v, nbs, bestComm);
-            }
+            consumers.provide(v);
         }
 
-        // maybe always shocking when set degree limit
-        return total == 0L ? 0d : (double) moved / total;
+        consumers.await();
+
+        // maybe always shocking when set degree limited
+        return total == 0L ? 0d : moved.doubleValue() / total;
     }
 
     private void mergeCommunities(int pass) {
+        LOG.info("Merge community for pass {}", pass);
         // merge each community as a vertex
         Collection<Pair<Community, Set<Id>>> comms = this.cache.communities();
-        assert this.allMembersExist(comms, pass -1);
+        assert this.allMembersExist(comms,  pass - 1);
         this.cache.resetVertexWeight();
+
+        Consumers<Pair<Community, Set<Id>>> consumers = new Consumers<>(
+                                                        this.executor, pair -> 
{
+            // called by multi-threads
+            this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
+        }, () -> {
+            // commit when finished
+            this.graph().tx().commit();
+        });
+        consumers.start();
+
         for (Pair<Community, Set<Id>> pair : comms) {
-            Community c = pair.getKey();
+            Community c = pair.getLeft();
             if (c.empty()) {
                 continue;
             }
-            // update kin and edges between communities
-            int kin = c.kin();
-            Set<Id> vertices = pair.getRight();
-            assert !vertices.isEmpty();
-            assert vertices.size() == c.size();
-            List<String> members = new ArrayList<>(vertices.size());
-            Map<Id, MutableInt> cedges = new HashMap<>(vertices.size());
-            for (Id v : vertices) {
-                this.updateProgress(++this.progress);
-                members.add(v.toString());
-                // collect edges between this community and other communities
-                List<Edge> neighbors = neighbors(v);
-                for (Edge edge : neighbors) {
-                    Vertex otherV = ((HugeEdge) edge).otherVertex();
-                    if (vertices.contains(otherV.id())) {
-                        // inner edges of this community, will be calc twice
-                        // due to both e-in and e-out are in vertices,
-                        kin += weightOfEdge(edge);
-                        continue;
-                    }
-                    assert this.cache.vertex2Community(otherV.id()) != null;
-                    Id otherCid = cidOfVertex(otherV, null);
-                    if (otherCid.compareTo(c.cid) < 0) {
-                        // skip if it should be collected by otherC
-                        continue;
-                    }
-                    if (!cedges.containsKey(otherCid)) {
-                        cedges.put(otherCid, new MutableInt(0));
-                    }
-                    // update edge weight
-                    cedges.get(otherCid).add(weightOfEdge(edge));
-                }
-            }
-            // insert new community vertex and edges into storage
-            this.insertNewCommunity(pass, c.cid, c.weight(), kin, members, 
cedges);
+            this.progress += pair.getRight().size();
+            this.updateProgress(this.progress);
+            //this.mergeCommunity(pass, pair.getLeft(), pair.getRight());
+            consumers.provide(pair);
         }
+        consumers.await();
+
         this.graph().tx().commit();
+        assert this.allMembersExist(pass);
+
         // reset communities
         this.cache.reset();
     }
 
+    private void mergeCommunity(int pass, Community c, Set<Id> cvertices) {
+        // update kin and edges between communities
+        int kin = c.kin();
+        int membersSize = cvertices.size();
+        assert !cvertices.isEmpty();
+        assert membersSize == c.size();
+        List<String> members = new ArrayList<>(membersSize);
+        Map<Id, MutableFloat> cedges = new HashMap<>(membersSize);
+        for (Id v : cvertices) {
+            members.add(v.toString());
+            // collect edges between this community and other communities
+            List<Edge> neighbors = neighbors(v);
+            for (Edge edge : neighbors) {
+                Vertex otherV = ((HugeEdge) edge).otherVertex();
+                if (cvertices.contains(otherV.id())) {
+                    // inner edges of this community, will be calc twice
+                    // due to both e-in and e-out are in vertices,
+                    kin += weightOfEdge(edge);
+                    continue;
+                }
+                assert this.cache.vertex2Community(otherV.id()) != null;
+                Id otherCid = communityOfVertex(otherV, null).cid;
+                if (otherCid.compareTo(c.cid) < 0) {
+                    // skip if it should be collected by otherC
+                    continue;
+                }
+                if (!cedges.containsKey(otherCid)) {
+                    cedges.putIfAbsent(otherCid, new MutableFloat(0f));
+                }
+                // update edge weight
+                cedges.get(otherCid).add(weightOfEdge(edge));
+            }
+        }
+
+        // insert new community vertex and edges into storage
+        this.insertNewCommunity(pass, c.cid, c.weight(), kin, members, cedges);
+    }
+
     private boolean allMembersExist(Collection<Pair<Community, Set<Id>>> comms,
-                                    int pass) {
-        String lastLabel = labelOfPassN(pass);
-        GraphTraversal<Vertex, Object> t = pass < 0 ? this.g.V().id() :
+                                    int lastPass) {
+        String lastLabel = labelOfPassN(lastPass);
+        GraphTraversal<Vertex, Object> t = lastPass < 0 ? this.g.V().id() :
                                            this.g.V().hasLabel(lastLabel).id();
         Set<Object> all = this.execute(t, t::toSet);
         for (Pair<Community, Set<Id>> comm : comms) {
@@ -475,6 +529,24 @@ public class LouvainTraverser extends AlgoTraverser {
         return all.isEmpty();
     }
 
+    private boolean allMembersExist(int pass) {
+        String label = labelOfPassN(pass);
+        int lastPass = pass - 1;
+        Number expected;
+        if (lastPass < 0) {
+            expected = tryNext(this.g.V().count()).longValue() -
+                       tryNext(this.g.V().hasLabel(label).count()).longValue();
+        } else {
+            expected = tryNext(this.g.V().hasLabel(labelOfPassN(lastPass))
+                                         .values(C_WEIGHT).sum());
+        }
+        Number actual = tryNext(this.g.V().hasLabel(label)
+                                          .values(C_WEIGHT).sum());
+        boolean allExist = actual.floatValue() == expected.floatValue();
+        assert allExist : actual + "!=" + expected;
+        return allExist;
+    }
+
     public Object louvain(int maxTimes, int stableTimes, double precision) {
         assert maxTimes > 0;
         assert precision > 0d;
@@ -678,28 +750,39 @@ public class LouvainTraverser extends AlgoTraverser {
             return this.weight;
         }
 
-        public void add(LouvainTraverser t, Vertex v, List<Edge> nbs) {
+        public synchronized void add(LouvainTraverser t,
+                                     Vertex v, List<Edge> nbs) {
             this.size++;
             this.weight += t.cweightOfVertex(v);
             this.kin += t.kinOfVertex(v);
             this.kout += t.weightOfVertex(v, nbs);
         }
 
-        public void remove(LouvainTraverser t, Vertex v, List<Edge> nbs) {
+        public synchronized void remove(LouvainTraverser t,
+                                        Vertex v, List<Edge> nbs) {
             this.size--;
             this.weight -= t.cweightOfVertex(v);
             this.kin -= t.kinOfVertex(v);
             this.kout -= t.weightOfVertex(v, nbs);
         }
 
-        public int kin() {
+        public synchronized int kin() {
             return this.kin;
         }
 
-        public float kout() {
+        public synchronized float kout() {
             return this.kout;
         }
 
+        @Override
+        public boolean equals(Object object) {
+            if (!(object instanceof Community)) {
+                return false;
+            }
+            Community other = (Community) object;
+            return Objects.equals(this.cid, other.cid);
+        }
+
         @Override
         public String toString() {
             return String.format("[%s](size=%s weight=%s kin=%s kout=%s)",
@@ -715,9 +798,9 @@ public class LouvainTraverser extends AlgoTraverser {
         private final Map<Id, Integer> genIds;
 
         public Cache() {
-            this.vertexWeightCache = new HashMap<>();
-            this.vertex2Community = new HashMap<>();
-            this.genIds = new HashMap<>();
+            this.vertexWeightCache = new ConcurrentHashMap<>();
+            this.vertex2Community = new ConcurrentHashMap<>();
+            this.genIds = new ConcurrentHashMap<>();
         }
 
         public Community vertex2Community(Object id) {
@@ -725,8 +808,16 @@ public class LouvainTraverser extends AlgoTraverser {
             return this.vertex2Community.get(id);
         }
 
-        public void vertex2Community(Id id, Community c) {
-            this.vertex2Community.put(id, c);
+        public Community vertex2Community(Id id, Community c) {
+            return this.vertex2Community.put(id, c);
+        }
+
+        public Community vertex2CommunityIfAbsent(Id id, Community c) {
+            Community old = this.vertex2Community.putIfAbsent(id, c);
+            if (old != null) {
+                c = old;
+            }
+            return c;
         }
 
         public Float vertexWeight(Id id) {
@@ -748,11 +839,13 @@ public class LouvainTraverser extends AlgoTraverser {
         }
 
         public Id genId(int pass, Id cid) {
-            if (!this.genIds.containsKey(cid)) {
-                this.genIds.put(cid, this.genIds.size() + 1);
+            synchronized (this.genIds) {
+                if (!this.genIds.containsKey(cid)) {
+                    this.genIds.putIfAbsent(cid, this.genIds.size() + 1);
+                }
+                String id = pass + "~" + this.genIds.get(cid);
+                return IdGenerator.of(id);
             }
-            String id = pass + "~" + this.genIds.get(cid);
-            return IdGenerator.of(id);
         }
 
         @SuppressWarnings("unused")
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
index abcdb938c..e98ed8480 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/LpaAlgorithm.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.tinkerpop.gremlin.process.traversal.Scope;
@@ -54,14 +55,15 @@ public class LpaAlgorithm extends AbstractCommAlgorithm {
         direction(parameters);
         degree(parameters);
         showCommunity(parameters);
+        workers(parameters);
     }
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
+        int workers = workers(parameters);
         String showComm = showCommunity(parameters);
 
-        try {
+        try (Traverser traverser = new Traverser(job, workers)) {
             if (showComm != null) {
                 return traverser.showCommunity(showComm);
             } else {
@@ -84,8 +86,8 @@ public class LpaAlgorithm extends AbstractCommAlgorithm {
 
         private final Random R = new Random();
 
-        public Traverser(Job<Object> job) {
-            super(job);
+        public Traverser(Job<Object> job, int workers) {
+            super(job, "lpa", workers);
         }
 
         public Object lpa(String sourceLabel, String edgeLabel,
@@ -113,7 +115,7 @@ public class LpaAlgorithm extends AbstractCommAlgorithm {
                 }
             }
 
-            long communities = this.graph().traversal().V().limit(10000L)
+            long communities = this.graph().traversal().V().limit(100000L)
                                    .groupCount().by(C_LABEL)
                                    .count(Scope.local).next();
             return ImmutableMap.of("iteration_times", times,
@@ -143,26 +145,30 @@ public class LpaAlgorithm extends AbstractCommAlgorithm {
             // shuffle: r.order().by(shuffle)
             // r = this.graph().traversal().V().sample((int) LIMIT);
 
-            // all vertices
-            Iterator<Vertex> vertices = this.vertices(sourceLabel, LIMIT);
-
-            long total = 0L;
-            long changed = 0L;
-            while (vertices.hasNext()) {
-                this.updateProgress(++this.progress);
-                total++;
-                Vertex v = vertices.next();
-                String label = this.voteCommunityOfVertex(v, edgeLabel,
-                                                          dir, degree);
-                // update label if it's absent or changed
-                if (!labelPresent(v) || !label.equals(this.labelOfVertex(v))) {
-                    changed++;
-                    this.updateLabelOfVertex(v, label);
+            // detect all vertices
+            AtomicLong changed = new AtomicLong(0L);
+            long total = this.traverse(sourceLabel, null, v -> {
+                // called by multi-threads
+                if (this.voteCommunityAndUpdate(v, edgeLabel, dir, degree)) {
+                    changed.incrementAndGet();
                 }
-            }
+            });
+
             this.graph().tx().commit();
 
-            return total == 0L ? 0d : (double) changed / total;
+            return total == 0L ? 0d : changed.doubleValue() / total;
+        }
+
+        private boolean voteCommunityAndUpdate(Vertex vertex, String edgeLabel,
+                                               Directions dir, long degree) {
+            String label = this.voteCommunityOfVertex(vertex, edgeLabel,
+                                                      dir, degree);
+            // update label if it's absent or changed
+            if (!labelPresent(vertex) || !label.equals(labelOfVertex(vertex))) 
{
+                this.updateLabelOfVertex(vertex, label);
+                return true;
+            }
+            return false;
         }
 
         private String voteCommunityOfVertex(Vertex vertex, String edgeLabel,
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
index c47d19f65..34a1a5658 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/comm/TriangleCountAlgorithm.java
@@ -48,9 +48,10 @@ public class TriangleCountAlgorithm extends 
AbstractCommAlgorithm {
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.triangleCount(direction(parameters),
-                                       degree(parameters));
+        try (Traverser traverser = new Traverser(job)) {
+            return traverser.triangleCount(direction(parameters),
+                                           degree(parameters));
+        }
     }
 
     protected static class Traverser extends AlgoTraverser {
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
index b3ce1ec99..855b7c817 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/path/RingsDetectAlgorithm.java
@@ -19,17 +19,11 @@
 
 package com.baidu.hugegraph.job.algorithm.path;
 
-import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-import com.baidu.hugegraph.HugeGraph;
 import com.baidu.hugegraph.backend.id.Id;
-import com.baidu.hugegraph.backend.query.Query;
 import com.baidu.hugegraph.job.Job;
 import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
-import com.baidu.hugegraph.structure.HugeVertex;
 import com.baidu.hugegraph.traversal.algorithm.SubGraphTraverser;
 import com.baidu.hugegraph.type.define.Directions;
 import com.baidu.hugegraph.util.JsonUtil;
@@ -56,41 +50,42 @@ public class RingsDetectAlgorithm extends AbstractAlgorithm 
{
         sourceCLabel(parameters);
         direction(parameters);
         edgeLabel(parameters);
+        workers(parameters);
     }
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.rings(sourceLabel(parameters),
-                               sourceCLabel(parameters),
-                               direction(parameters),
-                               edgeLabel(parameters),
-                               depth(parameters),
-                               degree(parameters),
-                               capacity(parameters),
-                               limit(parameters));
+        int workers = workers(parameters);
+        try (Traverser traverser = new Traverser(job, workers)) {
+            return traverser.rings(sourceLabel(parameters),
+                                   sourceCLabel(parameters),
+                                   direction(parameters),
+                                   edgeLabel(parameters),
+                                   depth(parameters),
+                                   degree(parameters),
+                                   capacity(parameters),
+                                   limit(parameters));
+        }
     }
 
     public static class Traverser extends AlgoTraverser {
 
-        public Traverser(Job<Object> job) {
-            super(job);
+        public Traverser(Job<Object> job, int workers) {
+            super(job, "ring", workers);
         }
 
         public Object rings(String sourceLabel, String sourceCLabel,
                             Directions dir, String label, int depth,
                             long degree, long capacity, long limit) {
-            HugeGraph graph = this.graph();
-            Iterator<Vertex> vertices = this.vertices(sourceLabel, 
sourceCLabel,
-                                                      Query.NO_LIMIT);
             JsonMap ringsJson = new JsonMap();
             ringsJson.startObject();
             ringsJson.appendKey("rings");
             ringsJson.startList();
-            SubGraphTraverser traverser = new SubGraphTraverser(graph);
-            while(vertices.hasNext()) {
-                this.updateProgress(++this.progress);
-                Id source = ((HugeVertex) vertices.next()).id();
+
+            SubGraphTraverser traverser = new SubGraphTraverser(this.graph());
+
+            this.traverse(sourceLabel, sourceCLabel, v -> {
+                Id source = (Id) v.id();
                 PathSet rings = traverser.rings(source, dir, label, depth,
                                                 true, degree, capacity, limit);
                 for (Path ring : rings) {
@@ -101,10 +96,13 @@ public class RingsDetectAlgorithm extends 
AbstractAlgorithm {
                         }
                     }
                     if (source.equals(min)) {
-                        ringsJson.appendRaw(JsonUtil.toJson(ring.vertices()));
+                        String ringJson = JsonUtil.toJson(ring.vertices());
+                        synchronized (ringsJson) {
+                            ringsJson.appendRaw(ringJson);
+                        }
                     }
                 }
-            }
+            });
             ringsJson.endList();
             ringsJson.endObject();
 
diff --git 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
index 26ee4e25e..463526c5d 100644
--- 
a/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
+++ 
b/hugegraph-core/src/main/java/com/baidu/hugegraph/job/algorithm/similarity/FusiformSimilarityAlgorithm.java
@@ -19,14 +19,11 @@
 
 package com.baidu.hugegraph.job.algorithm.similarity;
 
-import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import com.baidu.hugegraph.HugeGraph;
-import com.baidu.hugegraph.backend.query.Query;
 import com.baidu.hugegraph.job.Job;
 import com.baidu.hugegraph.job.algorithm.AbstractAlgorithm;
 import com.baidu.hugegraph.schema.EdgeLabel;
@@ -72,24 +69,27 @@ public class FusiformSimilarityAlgorithm extends 
AbstractAlgorithm {
         sourceCLabel(parameters);
         direction(parameters);
         edgeLabel(parameters);
+        workers(parameters);
     }
 
     @Override
     public Object call(Job<Object> job, Map<String, Object> parameters) {
-        Traverser traverser = new Traverser(job);
-        return traverser.fusiformSimilars(sourceLabel(parameters),
-                                          sourceCLabel(parameters),
-                                          direction(parameters),
-                                          edgeLabel(parameters),
-                                          minNeighbors(parameters),
-                                          alpha(parameters),
-                                          minSimilars(parameters),
-                                          top(parameters),
-                                          groupProperty(parameters),
-                                          minGroups(parameters),
-                                          degree(parameters),
-                                          capacity(parameters),
-                                          limit(parameters));
+        int workers = workers(parameters);
+        try (Traverser traverser = new Traverser(job, workers)) {
+            return traverser.fusiformSimilars(sourceLabel(parameters),
+                                              sourceCLabel(parameters),
+                                              direction(parameters),
+                                              edgeLabel(parameters),
+                                              minNeighbors(parameters),
+                                              alpha(parameters),
+                                              minSimilars(parameters),
+                                              top(parameters),
+                                              groupProperty(parameters),
+                                              minGroups(parameters),
+                                              degree(parameters),
+                                              capacity(parameters),
+                                              limit(parameters));
+        }
     }
 
     protected static int minNeighbors(Map<String, Object> parameters) {
@@ -128,8 +128,8 @@ public class FusiformSimilarityAlgorithm extends 
AbstractAlgorithm {
 
     protected static class Traverser extends AlgoTraverser {
 
-        public Traverser(Job<Object> job) {
-            super(job);
+        public Traverser(Job<Object> job, int workers) {
+            super(job, "fusiform", workers);
         }
 
         public Object fusiformSimilars(String sourceLabel, String sourceCLabel,
@@ -138,31 +138,30 @@ public class FusiformSimilarityAlgorithm extends 
AbstractAlgorithm {
                                        int minSimilars, long topSimilars,
                                        String groupProperty, int minGroups,
                                        long degree, long capacity, long limit) 
{
-            Iterator<Vertex> vertices = this.vertices(sourceLabel, 
sourceCLabel,
-                                                      Query.NO_LIMIT);
             HugeGraph graph = this.graph();
             EdgeLabel edgeLabel = label == null ? null : 
graph.edgeLabel(label);
 
-            FusiformSimilarityTraverser traverser = new
-                                        FusiformSimilarityTraverser(graph);
+            FusiformSimilarityTraverser traverser =
+                                        new FusiformSimilarityTraverser(graph);
             JsonMap similarsJson = new JsonMap();
             similarsJson.startObject();
-            while(vertices.hasNext()) {
-                this.updateProgress(++this.progress);
-                Vertex vertex = vertices.next();
+
+            this.traverse(sourceLabel, sourceCLabel, v -> {
                 SimilarsMap similars = traverser.fusiformSimilarity(
-                                       IteratorUtils.of(vertex), direction,
+                                       IteratorUtils.of(v), direction,
                                        edgeLabel, minNeighbors, alpha,
                                        minSimilars, (int) topSimilars,
                                        groupProperty, minGroups, degree,
                                        capacity, limit, true);
                 if (similars.isEmpty()) {
-                    continue;
+                    return;
                 }
                 String result = JsonUtil.toJson(similars.toMap());
                 result = result.substring(1, result.length() - 1);
-                similarsJson.appendRaw(result);
-            }
+                synchronized (similarsJson) {
+                    similarsJson.appendRaw(result);
+                }
+            });
             similarsJson.endObject();
 
             return similarsJson.asJson();

Reply via email to