This is an automated email from the ASF dual-hosted git repository. vgalaxies pushed a commit to branch fix-scheduler in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
commit ea2b9e4d2959780d3bd43a11ef5795c1e4f2141a Author: VGalaxies <[email protected]> AuthorDate: Sun Apr 7 18:17:12 2024 +0800 fix --- .../org/apache/hugegraph/task/TaskManager.java | 215 ++++++++++++++++----- 1 file changed, 162 insertions(+), 53 deletions(-) diff --git a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java index 144387949..afc5c9d49 100644 --- a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java +++ b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java @@ -48,6 +48,11 @@ public final class TaskManager { "server-info-db-worker-%d"; public static final String TASK_SCHEDULER = "task-scheduler-%d"; + public static final String OLAP_TASK_WORKER = "olap-task-worker-%d"; + public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d"; + public static final String EPHEMERAL_TASK_WORKER = "ephemeral-task-worker-%d"; + public static final String DISTRIBUTED_TASK_SCHEDULER = "distributed-scheduler-%d"; + protected static final long SCHEDULE_PERIOD = 1000L; // unit ms private static final long TX_CLOSE_TIMEOUT = 30L; // unit s private static final int THREADS = 4; @@ -60,6 +65,11 @@ public final class TaskManager { private final ExecutorService serverInfoDbExecutor; private final PausableScheduledThreadPool schedulerExecutor; + private final ExecutorService schemaTaskExecutor; + private final ExecutorService olapTaskExecutor; + private final ExecutorService ephemeralTaskExecutor; + private final PausableScheduledThreadPool distributedSchedulerExecutor; + private boolean enableRoleElected = false; public static TaskManager instance() { @@ -76,6 +86,17 @@ public final class TaskManager { 1, TASK_DB_WORKER); this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool( 1, SERVER_INFO_DB_WORKER); + + this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, + SCHEMA_TASK_WORKER); + this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, + OLAP_TASK_WORKER); + this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, + EPHEMERAL_TASK_WORKER); + this.distributedSchedulerExecutor = + ExecutorUtil.newPausableScheduledThreadPool(1, + DISTRIBUTED_TASK_SCHEDULER); + // For schedule task to run, just one thread is ok this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool( 1, TASK_SCHEDULER); @@ -88,11 +109,36 @@ public final class TaskManager { public void addScheduler(HugeGraphParams graph) { E.checkArgumentNotNull(graph, "The graph can't be null"); - - TaskScheduler scheduler = new StandardTaskScheduler(graph, - this.taskExecutor, this.taskDbExecutor, - this.serverInfoDbExecutor); - this.schedulers.put(graph, scheduler); + LOG.info("Use {} as the scheduler of graph ({})", + graph.schedulerType(), graph.name()); + // TODO: 如当前服务绑定到指定的非 DEFAULT 图空间,非当前图空间的图不再创建任务调度器 (graph space) + switch (graph.schedulerType()) { + case "distributed": { + TaskScheduler scheduler = + new DistributedTaskScheduler( + graph, + distributedSchedulerExecutor, + taskDbExecutor, + schemaTaskExecutor, + olapTaskExecutor, + taskExecutor, /* gremlinTaskExecutor */ + ephemeralTaskExecutor, + serverInfoDbExecutor); + this.schedulers.put(graph, scheduler); + break; + } + case "local": + default: { + TaskScheduler scheduler = + new StandardTaskScheduler( + graph, + this.taskExecutor, + this.taskDbExecutor, + this.serverInfoDbExecutor); + this.schedulers.put(graph, scheduler); + break; + } + } } public void closeScheduler(HugeGraphParams graph) { @@ -123,6 +169,10 @@ public final class TaskManager { if (!this.schedulerExecutor.isTerminated()) { this.closeSchedulerTx(graph); } + + if (!this.distributedSchedulerExecutor.isTerminated()) { + this.closeDistributedSchedulerTx(graph); + } } private void closeTaskTx(HugeGraphParams graph) { @@ -157,6 +207,21 @@ public final class TaskManager { } } + private void closeDistributedSchedulerTx(HugeGraphParams graph) { + final Callable<Void> closeTx = () -> { + // Do close-tx for current thread + graph.closeTx(); + // Let other threads run + Thread.yield(); + return null; + }; + try { + this.distributedSchedulerExecutor.submit(closeTx).get(); + } catch (Exception e) { + throw new HugeException("Exception when closing scheduler tx", e); + } + } + public void pauseScheduledThreadPool() { this.schedulerExecutor.pauseSchedule(); } @@ -170,8 +235,7 @@ public final class TaskManager { } public ServerInfoManager getServerInfoManager(HugeGraphParams graph) { - StandardTaskScheduler scheduler = (StandardTaskScheduler) - this.getScheduler(graph); + TaskScheduler scheduler = this.getScheduler(graph); if (scheduler == null) { return null; } @@ -195,10 +259,21 @@ public final class TaskManager { } } + if (terminated && !this.distributedSchedulerExecutor.isShutdown()) { + this.distributedSchedulerExecutor.shutdown(); + try { + terminated = this.distributedSchedulerExecutor.awaitTermination(timeout, + unit); + } catch (Throwable e) { + ex = e; + } + } + if (terminated && !this.taskExecutor.isShutdown()) { this.taskExecutor.shutdown(); try { - terminated = this.taskExecutor.awaitTermination(timeout, unit); + terminated = this.taskExecutor.awaitTermination(timeout, + unit); } catch (Throwable e) { ex = e; } @@ -217,7 +292,38 @@ public final class TaskManager { if (terminated && !this.taskDbExecutor.isShutdown()) { this.taskDbExecutor.shutdown(); try { - terminated = this.taskDbExecutor.awaitTermination(timeout, unit); + terminated = this.taskDbExecutor.awaitTermination(timeout, + unit); + } catch (Throwable e) { + ex = e; + } + } + + if (terminated && !this.ephemeralTaskExecutor.isShutdown()) { + this.ephemeralTaskExecutor.shutdown(); + try { + terminated = this.ephemeralTaskExecutor.awaitTermination(timeout, + unit); + } catch (Throwable e) { + ex = e; + } + } + + if (terminated && !this.schemaTaskExecutor.isShutdown()) { + this.schemaTaskExecutor.shutdown(); + try { + terminated = this.schemaTaskExecutor.awaitTermination(timeout, + unit); + } catch (Throwable e) { + ex = e; + } + } + + if (terminated && !this.olapTaskExecutor.isShutdown()) { + this.olapTaskExecutor.shutdown(); + try { + terminated = this.olapTaskExecutor.awaitTermination(timeout, + unit); } catch (Throwable e) { ex = e; } @@ -292,7 +398,7 @@ public final class TaskManager { // Called by scheduler timer try { for (TaskScheduler entry : this.schedulers.values()) { - StandardTaskScheduler scheduler = (StandardTaskScheduler) entry; + TaskScheduler scheduler = entry; // Maybe other thread close&remove scheduler at the same time synchronized (scheduler) { this.scheduleOrExecuteJobForGraph(scheduler); @@ -303,56 +409,59 @@ public final class TaskManager { } } - private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) { + private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) { E.checkNotNull(scheduler, "scheduler"); - ServerInfoManager serverManager = scheduler.serverManager(); - String graph = scheduler.graphName(); - - LockUtil.lock(graph, LockUtil.GRAPH_LOCK); - try { - /* - * Skip if: - * graph is closed (iterate schedulers before graph is closing) - * or - * graph is not initialized(maybe truncated or cleared). - * - * If graph is closing by other thread, current thread get - * serverManager and try lock graph, at the same time other - * thread deleted the lock-group, current thread would get - * exception 'LockGroup xx does not exists'. - * If graph is closed, don't call serverManager.initialized() - * due to it will reopen graph tx. - */ - if (!serverManager.graphIsReady()) { - return; - } - - // Update server heartbeat - serverManager.heartbeat(); + if (scheduler instanceof StandardTaskScheduler) { + StandardTaskScheduler standardTaskScheduler = (StandardTaskScheduler) (scheduler); + ServerInfoManager serverManager = scheduler.serverManager(); + String graph = scheduler.graphName(); - /* - * Master will schedule tasks to suitable servers. - * Note a Worker may become to a Master, so elected-Master also needs to - * execute tasks assigned by previous Master when enableRoleElected=true. - * However, when enableRoleElected=false, a Master is only set by the - * config assignment, assigned-Master always stays the same state. - */ - if (serverManager.selfIsMaster()) { - scheduler.scheduleTasksOnMaster(); - if (!this.enableRoleElected && !serverManager.onlySingleNode()) { - // assigned-Master + non-single-node don't need to execute tasks + LockUtil.lock(graph, LockUtil.GRAPH_LOCK); + try { + /* + * Skip if: + * graph is closed (iterate schedulers before graph is closing) + * or + * graph is not initialized(maybe truncated or cleared). + * + * If graph is closing by other thread, current thread get + * serverManager and try lock graph, at the same time other + * thread deleted the lock-group, current thread would get + * exception 'LockGroup xx does not exists'. + * If graph is closed, don't call serverManager.initialized() + * due to it will reopen graph tx. + */ + if (!serverManager.graphIsReady()) { return; } - } - // Execute queued tasks scheduled to current server - scheduler.executeTasksOnWorker(serverManager.selfNodeId()); + // Update server heartbeat + serverManager.heartbeat(); + + /* + * Master will schedule tasks to suitable servers. + * Note a Worker may become to a Master, so elected-Master also needs to + * execute tasks assigned by previous Master when enableRoleElected=true. + * However, when enableRoleElected=false, a Master is only set by the + * config assignment, assigned-Master always stays the same state. + */ + if (serverManager.selfIsMaster()) { + standardTaskScheduler.scheduleTasksOnMaster(); + if (!this.enableRoleElected && !serverManager.onlySingleNode()) { + // assigned-Master + non-single-node don't need to execute tasks + return; + } + } - // Cancel tasks scheduled to current server - scheduler.cancelTasksOnWorker(serverManager.selfNodeId()); - } finally { - LockUtil.unlock(graph, LockUtil.GRAPH_LOCK); + // Execute queued tasks scheduled to current server + standardTaskScheduler.executeTasksOnWorker(serverManager.selfNodeId()); + + // Cancel tasks scheduled to current server + standardTaskScheduler.cancelTasksOnWorker(serverManager.selfNodeId()); + } finally { + LockUtil.unlock(graph, LockUtil.GRAPH_LOCK); + } } }
