This is an automated email from the ASF dual-hosted git repository.

ming pushed a commit to branch master
in repository 
https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git


The following commit(s) were added to refs/heads/master by this push:
     new e0b484a6 Improve: Passing workerId to WorkerStat & Skip wait worker 
close if master executes failed (#292)
e0b484a6 is described below

commit e0b484a65e2244ec49ef8bbc14185acd99dbc436
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Dec 6 18:03:31 2023 +0800

    Improve: Passing workerId to WorkerStat & Skip wait worker close if master 
executes failed (#292)
    
    * Improve code
    
    * Fix ci
---
 .../computer/core/compute/ComputeManager.java      |   8 +-
 .../computer/core/master/MasterService.java        | 181 +++++++++++----------
 .../computer/core/worker/WorkerService.java        |   2 +-
 .../assembly/travis/load-data-into-hugegraph.sh    |   2 +-
 .../computer/core/compute/ComputeManagerTest.java  |   2 +-
 5 files changed, 103 insertions(+), 92 deletions(-)

diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
index 4898abe6..692073ed 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
@@ -46,6 +46,7 @@ public class ComputeManager {
     private static final Logger LOG = Log.logger(ComputeManager.class);
     private static final String PREFIX = "partition-compute-executor-%s";
 
+    private final int workerId;
     private final ComputerContext context;
     private final Managers managers;
 
@@ -54,7 +55,8 @@ public class ComputeManager {
     private final MessageSendManager sendManager;
     private final ExecutorService computeExecutor;
 
-    public ComputeManager(ComputerContext context, Managers managers) {
+    public ComputeManager(int workerId, ComputerContext context, Managers 
managers) {
+        this.workerId = workerId;
         this.context = context;
         this.managers = managers;
         this.partitions = new HashMap<>();
@@ -73,7 +75,7 @@ public class ComputeManager {
     }
 
     public WorkerStat input() {
-        WorkerStat workerStat = new WorkerStat();
+        WorkerStat workerStat = new WorkerStat(this.workerId);
         this.recvManager.waitReceivedAllMessages();
 
         Map<Integer, PeekableIterator<KvEntry>> vertices =
@@ -142,7 +144,7 @@ public class ComputeManager {
     public WorkerStat compute(WorkerContext context, int superstep) {
         this.sendManager.startSend(MessageType.MSG);
 
-        WorkerStat workerStat = new WorkerStat();
+        WorkerStat workerStat = new WorkerStat(this.workerId);
         Map<Integer, PartitionStat> stats = new ConcurrentHashMap<>();
 
         /*
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
index fd1fdfd3..06fb7564 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
@@ -61,6 +61,7 @@ public class MasterService implements Closeable {
     private final Managers managers;
 
     private volatile boolean inited;
+    private volatile boolean failed;
     private volatile boolean closed;
     private Config config;
     private volatile Bsp4Master bsp4Master;
@@ -153,7 +154,9 @@ public class MasterService implements Closeable {
 
         this.masterComputation.close(new DefaultMasterContext());
 
-        this.bsp4Master.waitWorkersCloseDone();
+        if (!failed) {
+            this.bsp4Master.waitWorkersCloseDone();
+        }
 
         this.managers.closeAll(this.config);
 
@@ -183,97 +186,103 @@ public class MasterService implements Closeable {
         this.checkInited();
 
         LOG.info("{} MasterService execute", this);
-        /*
-         * Step 1: Determines which superstep to start from, and resume this
-         * superstep.
-         */
-        int superstep = this.superstepToResume();
-        LOG.info("{} MasterService resume from superstep: {}",
-                 this, superstep);
+        try {
+            /*
+             * Step 1: Determines which superstep to start from, and resume 
this
+             * superstep.
+             */
+            int superstep = this.superstepToResume();
+            LOG.info("{} MasterService resume from superstep: {}",
+                     this, superstep);
 
-        /*
-         * TODO: Get input splits from HugeGraph if resume from
-         * Constants.INPUT_SUPERSTEP.
-         */
-        this.bsp4Master.masterResumeDone(superstep);
+            /*
+             * TODO: Get input splits from HugeGraph if resume from
+             * Constants.INPUT_SUPERSTEP.
+             */
+            this.bsp4Master.masterResumeDone(superstep);
 
-        /*
-         * Step 2: Input superstep for loading vertices and edges.
-         * This step may be skipped if resume from other superstep than
-         * Constants.INPUT_SUPERSTEP.
-         */
-        SuperstepStat superstepStat;
-        watcher.start();
-        if (superstep == Constants.INPUT_SUPERSTEP) {
-            superstepStat = this.inputstep();
-            superstep++;
-        } else {
-            // TODO: Get superstepStat from bsp service.
-            superstepStat = null;
-        }
-        watcher.stop();
-        LOG.info("{} MasterService input step cost: {}",
-                 this, TimeUtil.readableTime(watcher.getTime()));
-        E.checkState(superstep <= this.maxSuperStep,
-                     "The superstep {} can't be > maxSuperStep {}",
-                     superstep, this.maxSuperStep);
-
-        watcher.reset();
-        watcher.start();
-        // Step 3: Iteration computation of all supersteps.
-        for (; superstepStat.active(); superstep++) {
-            LOG.info("{} MasterService superstep {} started",
-                     this, superstep);
             /*
-             * Superstep iteration. The steps in each superstep are:
-             * 1) Master waits workers superstep prepared.
-             * 2) All managers call beforeSuperstep.
-             * 3) Master signals the workers that the master prepared
-             *    superstep.
-             * 4) Master waits the workers do vertex computation.
-             * 5) Master signal the workers that all workers have finished
-             *    vertex computation.
-             * 6) Master waits the workers end the superstep, and get
-             *    superstepStat.
-             * 7) Master compute whether to continue the next superstep
-             *    iteration.
-             * 8) All managers call afterSuperstep.
-             * 9) Master signals the workers with superstepStat, and workers
-             *    know whether to continue the next superstep iteration.
+             * Step 2: Input superstep for loading vertices and edges.
+             * This step may be skipped if resume from other superstep than
+             * Constants.INPUT_SUPERSTEP.
              */
-            this.bsp4Master.waitWorkersStepPrepareDone(superstep);
-            this.managers.beforeSuperstep(this.config, superstep);
-            this.bsp4Master.masterStepPrepareDone(superstep);
-
-            this.bsp4Master.waitWorkersStepComputeDone(superstep);
-            this.bsp4Master.masterStepComputeDone(superstep);
-            List<WorkerStat> workerStats =
-                             this.bsp4Master.waitWorkersStepDone(superstep);
-            superstepStat = SuperstepStat.from(workerStats);
-            SuperstepContext context = new SuperstepContext(superstep,
-                                                            superstepStat);
-            // Call master compute(), note the worker afterSuperstep() is done
-            boolean masterContinue = this.masterComputation.compute(context);
-            if (this.finishedIteration(masterContinue, context)) {
-                superstepStat.inactivate();
+            SuperstepStat superstepStat;
+            watcher.start();
+            if (superstep == Constants.INPUT_SUPERSTEP) {
+                superstepStat = this.inputstep();
+                superstep++;
+            } else {
+                // TODO: Get superstepStat from bsp service.
+                superstepStat = null;
             }
-            this.managers.afterSuperstep(this.config, superstep);
-            this.bsp4Master.masterStepDone(superstep, superstepStat);
-
-            LOG.info("{} MasterService superstep {} finished",
-                     this, superstep);
+            watcher.stop();
+            LOG.info("{} MasterService input step cost: {}",
+                     this, TimeUtil.readableTime(watcher.getTime()));
+            E.checkState(superstep <= this.maxSuperStep,
+                         "The superstep {} can't be > maxSuperStep {}",
+                         superstep, this.maxSuperStep);
+
+            watcher.reset();
+            watcher.start();
+            // Step 3: Iteration computation of all supersteps.
+            for (; superstepStat.active(); superstep++) {
+                LOG.info("{} MasterService superstep {} started",
+                         this, superstep);
+                /*
+                 * Superstep iteration. The steps in each superstep are:
+                 * 1) Master waits workers superstep prepared.
+                 * 2) All managers call beforeSuperstep.
+                 * 3) Master signals the workers that the master prepared
+                 *    superstep.
+                 * 4) Master waits the workers do vertex computation.
+                 * 5) Master signal the workers that all workers have finished
+                 *    vertex computation.
+                 * 6) Master waits the workers end the superstep, and get
+                 *    superstepStat.
+                 * 7) Master compute whether to continue the next superstep
+                 *    iteration.
+                 * 8) All managers call afterSuperstep.
+                 * 9) Master signals the workers with superstepStat, and 
workers
+                 *    know whether to continue the next superstep iteration.
+                 */
+                this.bsp4Master.waitWorkersStepPrepareDone(superstep);
+                this.managers.beforeSuperstep(this.config, superstep);
+                this.bsp4Master.masterStepPrepareDone(superstep);
+
+                this.bsp4Master.waitWorkersStepComputeDone(superstep);
+                this.bsp4Master.masterStepComputeDone(superstep);
+                List<WorkerStat> workerStats =
+                        this.bsp4Master.waitWorkersStepDone(superstep);
+                superstepStat = SuperstepStat.from(workerStats);
+                SuperstepContext context = new SuperstepContext(superstep,
+                                                                superstepStat);
+                // Call master compute(), note the worker afterSuperstep() is 
done
+                boolean masterContinue = 
this.masterComputation.compute(context);
+                if (this.finishedIteration(masterContinue, context)) {
+                    superstepStat.inactivate();
+                }
+                this.managers.afterSuperstep(this.config, superstep);
+                this.bsp4Master.masterStepDone(superstep, superstepStat);
+
+                LOG.info("{} MasterService superstep {} finished",
+                         this, superstep);
+            }
+            watcher.stop();
+            LOG.info("{} MasterService compute step cost: {}",
+                     this, TimeUtil.readableTime(watcher.getTime()));
+
+            watcher.reset();
+            watcher.start();
+            // Step 4: Output superstep for outputting results.
+            this.outputstep();
+            watcher.stop();
+            LOG.info("{} MasterService output step cost: {}",
+                     this, TimeUtil.readableTime(watcher.getTime()));
+        } catch (Throwable throwable) {
+            LOG.error("{} MasterService execute failed", this, throwable);
+            failed = true;
+            throw throwable;
         }
-        watcher.stop();
-        LOG.info("{} MasterService compute step cost: {}",
-                 this, TimeUtil.readableTime(watcher.getTime()));
-
-        watcher.reset();
-        watcher.start();
-        // Step 4: Output superstep for outputting results.
-        this.outputstep();
-        watcher.stop();
-        LOG.info("{} MasterService output step cost: {}",
-                 this, TimeUtil.readableTime(watcher.getTime()));
     }
 
     @Override
diff --git 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
index 48af87c6..fc5b5dc5 100644
--- 
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
+++ 
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
@@ -137,7 +137,7 @@ public class WorkerService implements Closeable {
             dm.connect(worker.id(), worker.hostname(), worker.dataPort());
         }
 
-        this.computeManager = new ComputeManager(this.context, this.managers);
+        this.computeManager = new ComputeManager(this.workerInfo.id(), 
this.context, this.managers);
 
         this.managers.initedAll(this.config);
         LOG.info("{} WorkerService initialized", this);
diff --git a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh 
b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
index b68bec7d..e00890b6 100755
--- a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
+++ b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
@@ -26,7 +26,7 @@ 
HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git";
 git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain
 
 cd hugegraph-toolchain
-mvn install -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
+mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
 
 cd hugegraph-loader
 tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1
diff --git 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java
 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java
index 63f0c5b2..c8c1544c 100644
--- 
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java
+++ 
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java
@@ -114,7 +114,7 @@ public class ComputeManagerTest extends UnitTestBase {
         this.connectionId = new ConnectionId(new InetSocketAddress("localhost",
                                                                    8081),
                                              0);
-        this.computeManager = new ComputeManager(context(), this.managers);
+        this.computeManager = new ComputeManager(0, context(), this.managers);
     }
 
     @After

Reply via email to