This is an automated email from the ASF dual-hosted git repository.
ming pushed a commit to branch master
in repository
https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git
The following commit(s) were added to refs/heads/master by this push:
new e0b484a6 Improve: Passing workerId to WorkerStat & Skip wait worker
close if master executes failed (#292)
e0b484a6 is described below
commit e0b484a65e2244ec49ef8bbc14185acd99dbc436
Author: Cong Zhao <[email protected]>
AuthorDate: Wed Dec 6 18:03:31 2023 +0800
Improve: Passing workerId to WorkerStat & Skip wait worker close if master
executes failed (#292)
* Improve code
* Fix ci
---
.../computer/core/compute/ComputeManager.java | 8 +-
.../computer/core/master/MasterService.java | 181 +++++++++++----------
.../computer/core/worker/WorkerService.java | 2 +-
.../assembly/travis/load-data-into-hugegraph.sh | 2 +-
.../computer/core/compute/ComputeManagerTest.java | 2 +-
5 files changed, 103 insertions(+), 92 deletions(-)
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
index 4898abe6..692073ed 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java
@@ -46,6 +46,7 @@ public class ComputeManager {
private static final Logger LOG = Log.logger(ComputeManager.class);
private static final String PREFIX = "partition-compute-executor-%s";
+ private final int workerId;
private final ComputerContext context;
private final Managers managers;
@@ -54,7 +55,8 @@ public class ComputeManager {
private final MessageSendManager sendManager;
private final ExecutorService computeExecutor;
- public ComputeManager(ComputerContext context, Managers managers) {
+ public ComputeManager(int workerId, ComputerContext context, Managers
managers) {
+ this.workerId = workerId;
this.context = context;
this.managers = managers;
this.partitions = new HashMap<>();
@@ -73,7 +75,7 @@ public class ComputeManager {
}
public WorkerStat input() {
- WorkerStat workerStat = new WorkerStat();
+ WorkerStat workerStat = new WorkerStat(this.workerId);
this.recvManager.waitReceivedAllMessages();
Map<Integer, PeekableIterator<KvEntry>> vertices =
@@ -142,7 +144,7 @@ public class ComputeManager {
public WorkerStat compute(WorkerContext context, int superstep) {
this.sendManager.startSend(MessageType.MSG);
- WorkerStat workerStat = new WorkerStat();
+ WorkerStat workerStat = new WorkerStat(this.workerId);
Map<Integer, PartitionStat> stats = new ConcurrentHashMap<>();
/*
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
index fd1fdfd3..06fb7564 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java
@@ -61,6 +61,7 @@ public class MasterService implements Closeable {
private final Managers managers;
private volatile boolean inited;
+ private volatile boolean failed;
private volatile boolean closed;
private Config config;
private volatile Bsp4Master bsp4Master;
@@ -153,7 +154,9 @@ public class MasterService implements Closeable {
this.masterComputation.close(new DefaultMasterContext());
- this.bsp4Master.waitWorkersCloseDone();
+ if (!failed) {
+ this.bsp4Master.waitWorkersCloseDone();
+ }
this.managers.closeAll(this.config);
@@ -183,97 +186,103 @@ public class MasterService implements Closeable {
this.checkInited();
LOG.info("{} MasterService execute", this);
- /*
- * Step 1: Determines which superstep to start from, and resume this
- * superstep.
- */
- int superstep = this.superstepToResume();
- LOG.info("{} MasterService resume from superstep: {}",
- this, superstep);
+ try {
+ /*
+ * Step 1: Determines which superstep to start from, and resume
this
+ * superstep.
+ */
+ int superstep = this.superstepToResume();
+ LOG.info("{} MasterService resume from superstep: {}",
+ this, superstep);
- /*
- * TODO: Get input splits from HugeGraph if resume from
- * Constants.INPUT_SUPERSTEP.
- */
- this.bsp4Master.masterResumeDone(superstep);
+ /*
+ * TODO: Get input splits from HugeGraph if resume from
+ * Constants.INPUT_SUPERSTEP.
+ */
+ this.bsp4Master.masterResumeDone(superstep);
- /*
- * Step 2: Input superstep for loading vertices and edges.
- * This step may be skipped if resume from other superstep than
- * Constants.INPUT_SUPERSTEP.
- */
- SuperstepStat superstepStat;
- watcher.start();
- if (superstep == Constants.INPUT_SUPERSTEP) {
- superstepStat = this.inputstep();
- superstep++;
- } else {
- // TODO: Get superstepStat from bsp service.
- superstepStat = null;
- }
- watcher.stop();
- LOG.info("{} MasterService input step cost: {}",
- this, TimeUtil.readableTime(watcher.getTime()));
- E.checkState(superstep <= this.maxSuperStep,
- "The superstep {} can't be > maxSuperStep {}",
- superstep, this.maxSuperStep);
-
- watcher.reset();
- watcher.start();
- // Step 3: Iteration computation of all supersteps.
- for (; superstepStat.active(); superstep++) {
- LOG.info("{} MasterService superstep {} started",
- this, superstep);
/*
- * Superstep iteration. The steps in each superstep are:
- * 1) Master waits workers superstep prepared.
- * 2) All managers call beforeSuperstep.
- * 3) Master signals the workers that the master prepared
- * superstep.
- * 4) Master waits the workers do vertex computation.
- * 5) Master signal the workers that all workers have finished
- * vertex computation.
- * 6) Master waits the workers end the superstep, and get
- * superstepStat.
- * 7) Master compute whether to continue the next superstep
- * iteration.
- * 8) All managers call afterSuperstep.
- * 9) Master signals the workers with superstepStat, and workers
- * know whether to continue the next superstep iteration.
+ * Step 2: Input superstep for loading vertices and edges.
+ * This step may be skipped if resume from other superstep than
+ * Constants.INPUT_SUPERSTEP.
*/
- this.bsp4Master.waitWorkersStepPrepareDone(superstep);
- this.managers.beforeSuperstep(this.config, superstep);
- this.bsp4Master.masterStepPrepareDone(superstep);
-
- this.bsp4Master.waitWorkersStepComputeDone(superstep);
- this.bsp4Master.masterStepComputeDone(superstep);
- List<WorkerStat> workerStats =
- this.bsp4Master.waitWorkersStepDone(superstep);
- superstepStat = SuperstepStat.from(workerStats);
- SuperstepContext context = new SuperstepContext(superstep,
- superstepStat);
- // Call master compute(), note the worker afterSuperstep() is done
- boolean masterContinue = this.masterComputation.compute(context);
- if (this.finishedIteration(masterContinue, context)) {
- superstepStat.inactivate();
+ SuperstepStat superstepStat;
+ watcher.start();
+ if (superstep == Constants.INPUT_SUPERSTEP) {
+ superstepStat = this.inputstep();
+ superstep++;
+ } else {
+ // TODO: Get superstepStat from bsp service.
+ superstepStat = null;
}
- this.managers.afterSuperstep(this.config, superstep);
- this.bsp4Master.masterStepDone(superstep, superstepStat);
-
- LOG.info("{} MasterService superstep {} finished",
- this, superstep);
+ watcher.stop();
+ LOG.info("{} MasterService input step cost: {}",
+ this, TimeUtil.readableTime(watcher.getTime()));
+ E.checkState(superstep <= this.maxSuperStep,
+ "The superstep {} can't be > maxSuperStep {}",
+ superstep, this.maxSuperStep);
+
+ watcher.reset();
+ watcher.start();
+ // Step 3: Iteration computation of all supersteps.
+ for (; superstepStat.active(); superstep++) {
+ LOG.info("{} MasterService superstep {} started",
+ this, superstep);
+ /*
+ * Superstep iteration. The steps in each superstep are:
+ * 1) Master waits workers superstep prepared.
+ * 2) All managers call beforeSuperstep.
+ * 3) Master signals the workers that the master prepared
+ * superstep.
+ * 4) Master waits the workers do vertex computation.
+ * 5) Master signal the workers that all workers have finished
+ * vertex computation.
+ * 6) Master waits the workers end the superstep, and get
+ * superstepStat.
+ * 7) Master compute whether to continue the next superstep
+ * iteration.
+ * 8) All managers call afterSuperstep.
+ * 9) Master signals the workers with superstepStat, and
workers
+ * know whether to continue the next superstep iteration.
+ */
+ this.bsp4Master.waitWorkersStepPrepareDone(superstep);
+ this.managers.beforeSuperstep(this.config, superstep);
+ this.bsp4Master.masterStepPrepareDone(superstep);
+
+ this.bsp4Master.waitWorkersStepComputeDone(superstep);
+ this.bsp4Master.masterStepComputeDone(superstep);
+ List<WorkerStat> workerStats =
+ this.bsp4Master.waitWorkersStepDone(superstep);
+ superstepStat = SuperstepStat.from(workerStats);
+ SuperstepContext context = new SuperstepContext(superstep,
+ superstepStat);
+ // Call master compute(), note the worker afterSuperstep() is
done
+ boolean masterContinue =
this.masterComputation.compute(context);
+ if (this.finishedIteration(masterContinue, context)) {
+ superstepStat.inactivate();
+ }
+ this.managers.afterSuperstep(this.config, superstep);
+ this.bsp4Master.masterStepDone(superstep, superstepStat);
+
+ LOG.info("{} MasterService superstep {} finished",
+ this, superstep);
+ }
+ watcher.stop();
+ LOG.info("{} MasterService compute step cost: {}",
+ this, TimeUtil.readableTime(watcher.getTime()));
+
+ watcher.reset();
+ watcher.start();
+ // Step 4: Output superstep for outputting results.
+ this.outputstep();
+ watcher.stop();
+ LOG.info("{} MasterService output step cost: {}",
+ this, TimeUtil.readableTime(watcher.getTime()));
+ } catch (Throwable throwable) {
+ LOG.error("{} MasterService execute failed", this, throwable);
+ failed = true;
+ throw throwable;
}
- watcher.stop();
- LOG.info("{} MasterService compute step cost: {}",
- this, TimeUtil.readableTime(watcher.getTime()));
-
- watcher.reset();
- watcher.start();
- // Step 4: Output superstep for outputting results.
- this.outputstep();
- watcher.stop();
- LOG.info("{} MasterService output step cost: {}",
- this, TimeUtil.readableTime(watcher.getTime()));
}
@Override
diff --git
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
index 48af87c6..fc5b5dc5 100644
---
a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
+++
b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java
@@ -137,7 +137,7 @@ public class WorkerService implements Closeable {
dm.connect(worker.id(), worker.hostname(), worker.dataPort());
}
- this.computeManager = new ComputeManager(this.context, this.managers);
+ this.computeManager = new ComputeManager(this.workerInfo.id(),
this.context, this.managers);
this.managers.initedAll(this.config);
LOG.info("{} WorkerService initialized", this);
diff --git a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
index b68bec7d..e00890b6 100755
--- a/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
+++ b/computer-dist/src/assembly/travis/load-data-into-hugegraph.sh
@@ -26,7 +26,7 @@
HUGEGRAPH_LOADER_GIT_URL="https://github.com/apache/hugegraph-toolchain.git"
git clone --depth 10 ${HUGEGRAPH_LOADER_GIT_URL} hugegraph-toolchain
cd hugegraph-toolchain
-mvn install -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
+mvn install -P stage -pl hugegraph-client,hugegraph-loader -am -DskipTests -ntp
cd hugegraph-loader
tar -zxf target/apache-hugegraph-loader-*.tar.gz || exit 1
diff --git
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java
index 63f0c5b2..c8c1544c 100644
---
a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java
+++
b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java
@@ -114,7 +114,7 @@ public class ComputeManagerTest extends UnitTestBase {
this.connectionId = new ConnectionId(new InetSocketAddress("localhost",
8081),
0);
- this.computeManager = new ComputeManager(context(), this.managers);
+ this.computeManager = new ComputeManager(0, context(), this.managers);
}
@After