This is an automated email from the ASF dual-hosted git repository. zhaocong pushed a commit to branch improve_worker_id in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph-computer.git
commit d821dc35709900b576ad0ccd37bc1bade99d02c7 Author: coderzc <[email protected]> AuthorDate: Wed Dec 6 15:13:14 2023 +0800 Improve code --- .../computer/core/compute/ComputeManager.java | 8 +- .../computer/core/master/MasterService.java | 181 +++++++++++---------- .../computer/core/worker/WorkerService.java | 2 +- .../computer/core/compute/ComputeManagerTest.java | 2 +- 4 files changed, 102 insertions(+), 91 deletions(-) diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java index 4898abe6..692073ed 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManager.java @@ -46,6 +46,7 @@ public class ComputeManager { private static final Logger LOG = Log.logger(ComputeManager.class); private static final String PREFIX = "partition-compute-executor-%s"; + private final int workerId; private final ComputerContext context; private final Managers managers; @@ -54,7 +55,8 @@ public class ComputeManager { private final MessageSendManager sendManager; private final ExecutorService computeExecutor; - public ComputeManager(ComputerContext context, Managers managers) { + public ComputeManager(int workerId, ComputerContext context, Managers managers) { + this.workerId = workerId; this.context = context; this.managers = managers; this.partitions = new HashMap<>(); @@ -73,7 +75,7 @@ public class ComputeManager { } public WorkerStat input() { - WorkerStat workerStat = new WorkerStat(); + WorkerStat workerStat = new WorkerStat(this.workerId); this.recvManager.waitReceivedAllMessages(); Map<Integer, PeekableIterator<KvEntry>> vertices = @@ -142,7 +144,7 @@ public class ComputeManager { public WorkerStat compute(WorkerContext context, int superstep) { this.sendManager.startSend(MessageType.MSG); - WorkerStat workerStat = new WorkerStat(); + WorkerStat workerStat = new WorkerStat(this.workerId); Map<Integer, PartitionStat> stats = new ConcurrentHashMap<>(); /* diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java index fd1fdfd3..06fb7564 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/master/MasterService.java @@ -61,6 +61,7 @@ public class MasterService implements Closeable { private final Managers managers; private volatile boolean inited; + private volatile boolean failed; private volatile boolean closed; private Config config; private volatile Bsp4Master bsp4Master; @@ -153,7 +154,9 @@ public class MasterService implements Closeable { this.masterComputation.close(new DefaultMasterContext()); - this.bsp4Master.waitWorkersCloseDone(); + if (!failed) { + this.bsp4Master.waitWorkersCloseDone(); + } this.managers.closeAll(this.config); @@ -183,97 +186,103 @@ public class MasterService implements Closeable { this.checkInited(); LOG.info("{} MasterService execute", this); - /* - * Step 1: Determines which superstep to start from, and resume this - * superstep. - */ - int superstep = this.superstepToResume(); - LOG.info("{} MasterService resume from superstep: {}", - this, superstep); + try { + /* + * Step 1: Determines which superstep to start from, and resume this + * superstep. + */ + int superstep = this.superstepToResume(); + LOG.info("{} MasterService resume from superstep: {}", + this, superstep); - /* - * TODO: Get input splits from HugeGraph if resume from - * Constants.INPUT_SUPERSTEP. - */ - this.bsp4Master.masterResumeDone(superstep); + /* + * TODO: Get input splits from HugeGraph if resume from + * Constants.INPUT_SUPERSTEP. + */ + this.bsp4Master.masterResumeDone(superstep); - /* - * Step 2: Input superstep for loading vertices and edges. - * This step may be skipped if resume from other superstep than - * Constants.INPUT_SUPERSTEP. - */ - SuperstepStat superstepStat; - watcher.start(); - if (superstep == Constants.INPUT_SUPERSTEP) { - superstepStat = this.inputstep(); - superstep++; - } else { - // TODO: Get superstepStat from bsp service. - superstepStat = null; - } - watcher.stop(); - LOG.info("{} MasterService input step cost: {}", - this, TimeUtil.readableTime(watcher.getTime())); - E.checkState(superstep <= this.maxSuperStep, - "The superstep {} can't be > maxSuperStep {}", - superstep, this.maxSuperStep); - - watcher.reset(); - watcher.start(); - // Step 3: Iteration computation of all supersteps. - for (; superstepStat.active(); superstep++) { - LOG.info("{} MasterService superstep {} started", - this, superstep); /* - * Superstep iteration. The steps in each superstep are: - * 1) Master waits workers superstep prepared. - * 2) All managers call beforeSuperstep. - * 3) Master signals the workers that the master prepared - * superstep. - * 4) Master waits the workers do vertex computation. - * 5) Master signal the workers that all workers have finished - * vertex computation. - * 6) Master waits the workers end the superstep, and get - * superstepStat. - * 7) Master compute whether to continue the next superstep - * iteration. - * 8) All managers call afterSuperstep. - * 9) Master signals the workers with superstepStat, and workers - * know whether to continue the next superstep iteration. + * Step 2: Input superstep for loading vertices and edges. + * This step may be skipped if resume from other superstep than + * Constants.INPUT_SUPERSTEP. */ - this.bsp4Master.waitWorkersStepPrepareDone(superstep); - this.managers.beforeSuperstep(this.config, superstep); - this.bsp4Master.masterStepPrepareDone(superstep); - - this.bsp4Master.waitWorkersStepComputeDone(superstep); - this.bsp4Master.masterStepComputeDone(superstep); - List<WorkerStat> workerStats = - this.bsp4Master.waitWorkersStepDone(superstep); - superstepStat = SuperstepStat.from(workerStats); - SuperstepContext context = new SuperstepContext(superstep, - superstepStat); - // Call master compute(), note the worker afterSuperstep() is done - boolean masterContinue = this.masterComputation.compute(context); - if (this.finishedIteration(masterContinue, context)) { - superstepStat.inactivate(); + SuperstepStat superstepStat; + watcher.start(); + if (superstep == Constants.INPUT_SUPERSTEP) { + superstepStat = this.inputstep(); + superstep++; + } else { + // TODO: Get superstepStat from bsp service. + superstepStat = null; } - this.managers.afterSuperstep(this.config, superstep); - this.bsp4Master.masterStepDone(superstep, superstepStat); - - LOG.info("{} MasterService superstep {} finished", - this, superstep); + watcher.stop(); + LOG.info("{} MasterService input step cost: {}", + this, TimeUtil.readableTime(watcher.getTime())); + E.checkState(superstep <= this.maxSuperStep, + "The superstep {} can't be > maxSuperStep {}", + superstep, this.maxSuperStep); + + watcher.reset(); + watcher.start(); + // Step 3: Iteration computation of all supersteps. + for (; superstepStat.active(); superstep++) { + LOG.info("{} MasterService superstep {} started", + this, superstep); + /* + * Superstep iteration. The steps in each superstep are: + * 1) Master waits workers superstep prepared. + * 2) All managers call beforeSuperstep. + * 3) Master signals the workers that the master prepared + * superstep. + * 4) Master waits the workers do vertex computation. + * 5) Master signal the workers that all workers have finished + * vertex computation. + * 6) Master waits the workers end the superstep, and get + * superstepStat. + * 7) Master compute whether to continue the next superstep + * iteration. + * 8) All managers call afterSuperstep. + * 9) Master signals the workers with superstepStat, and workers + * know whether to continue the next superstep iteration. + */ + this.bsp4Master.waitWorkersStepPrepareDone(superstep); + this.managers.beforeSuperstep(this.config, superstep); + this.bsp4Master.masterStepPrepareDone(superstep); + + this.bsp4Master.waitWorkersStepComputeDone(superstep); + this.bsp4Master.masterStepComputeDone(superstep); + List<WorkerStat> workerStats = + this.bsp4Master.waitWorkersStepDone(superstep); + superstepStat = SuperstepStat.from(workerStats); + SuperstepContext context = new SuperstepContext(superstep, + superstepStat); + // Call master compute(), note the worker afterSuperstep() is done + boolean masterContinue = this.masterComputation.compute(context); + if (this.finishedIteration(masterContinue, context)) { + superstepStat.inactivate(); + } + this.managers.afterSuperstep(this.config, superstep); + this.bsp4Master.masterStepDone(superstep, superstepStat); + + LOG.info("{} MasterService superstep {} finished", + this, superstep); + } + watcher.stop(); + LOG.info("{} MasterService compute step cost: {}", + this, TimeUtil.readableTime(watcher.getTime())); + + watcher.reset(); + watcher.start(); + // Step 4: Output superstep for outputting results. + this.outputstep(); + watcher.stop(); + LOG.info("{} MasterService output step cost: {}", + this, TimeUtil.readableTime(watcher.getTime())); + } catch (Throwable throwable) { + LOG.error("{} MasterService execute failed", this, throwable); + failed = true; + throw throwable; } - watcher.stop(); - LOG.info("{} MasterService compute step cost: {}", - this, TimeUtil.readableTime(watcher.getTime())); - - watcher.reset(); - watcher.start(); - // Step 4: Output superstep for outputting results. - this.outputstep(); - watcher.stop(); - LOG.info("{} MasterService output step cost: {}", - this, TimeUtil.readableTime(watcher.getTime())); } @Override diff --git a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java index 48af87c6..fc5b5dc5 100644 --- a/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java +++ b/computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/WorkerService.java @@ -137,7 +137,7 @@ public class WorkerService implements Closeable { dm.connect(worker.id(), worker.hostname(), worker.dataPort()); } - this.computeManager = new ComputeManager(this.context, this.managers); + this.computeManager = new ComputeManager(this.workerInfo.id(), this.context, this.managers); this.managers.initedAll(this.config); LOG.info("{} WorkerService initialized", this); diff --git a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java index 63f0c5b2..c8c1544c 100644 --- a/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java +++ b/computer-test/src/main/java/org/apache/hugegraph/computer/core/compute/ComputeManagerTest.java @@ -114,7 +114,7 @@ public class ComputeManagerTest extends UnitTestBase { this.connectionId = new ConnectionId(new InetSocketAddress("localhost", 8081), 0); - this.computeManager = new ComputeManager(context(), this.managers); + this.computeManager = new ComputeManager(0, context(), this.managers); } @After
