This is an automated email from the ASF dual-hosted git repository.

vgalaxies pushed a commit to branch fix-scheduler
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git

commit ea2b9e4d2959780d3bd43a11ef5795c1e4f2141a
Author: VGalaxies <[email protected]>
AuthorDate: Sun Apr 7 18:17:12 2024 +0800

    fix
---
 .../org/apache/hugegraph/task/TaskManager.java     | 215 ++++++++++++++++-----
 1 file changed, 162 insertions(+), 53 deletions(-)

diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
index 144387949..afc5c9d49 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
@@ -48,6 +48,11 @@ public final class TaskManager {
             "server-info-db-worker-%d";
     public static final String TASK_SCHEDULER = "task-scheduler-%d";
 
+    public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
+    public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d";
+    public static final String EPHEMERAL_TASK_WORKER = 
"ephemeral-task-worker-%d";
+    public static final String DISTRIBUTED_TASK_SCHEDULER = 
"distributed-scheduler-%d";
+
     protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
     private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
     private static final int THREADS = 4;
@@ -60,6 +65,11 @@ public final class TaskManager {
     private final ExecutorService serverInfoDbExecutor;
     private final PausableScheduledThreadPool schedulerExecutor;
 
+    private final ExecutorService schemaTaskExecutor;
+    private final ExecutorService olapTaskExecutor;
+    private final ExecutorService ephemeralTaskExecutor;
+    private final PausableScheduledThreadPool distributedSchedulerExecutor;
+
     private boolean enableRoleElected = false;
 
     public static TaskManager instance() {
@@ -76,6 +86,17 @@ public final class TaskManager {
                 1, TASK_DB_WORKER);
         this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
                 1, SERVER_INFO_DB_WORKER);
+
+        this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+                                                                  
SCHEMA_TASK_WORKER);
+        this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+                                                                
OLAP_TASK_WORKER);
+        this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+                                                                     
EPHEMERAL_TASK_WORKER);
+        this.distributedSchedulerExecutor =
+                ExecutorUtil.newPausableScheduledThreadPool(1,
+                                                            
DISTRIBUTED_TASK_SCHEDULER);
+
         // For schedule task to run, just one thread is ok
         this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
                 1, TASK_SCHEDULER);
@@ -88,11 +109,36 @@ public final class TaskManager {
 
     public void addScheduler(HugeGraphParams graph) {
         E.checkArgumentNotNull(graph, "The graph can't be null");
-
-        TaskScheduler scheduler = new StandardTaskScheduler(graph,
-                                                            this.taskExecutor, 
this.taskDbExecutor,
-                                                            
this.serverInfoDbExecutor);
-        this.schedulers.put(graph, scheduler);
+        LOG.info("Use {} as the scheduler of graph ({})",
+                 graph.schedulerType(), graph.name());
+        // TODO: 如当前服务绑定到指定的非 DEFAULT 图空间,非当前图空间的图不再创建任务调度器 (graph space)
+        switch (graph.schedulerType()) {
+            case "distributed": {
+                TaskScheduler scheduler =
+                        new DistributedTaskScheduler(
+                                graph,
+                                distributedSchedulerExecutor,
+                                taskDbExecutor,
+                                schemaTaskExecutor,
+                                olapTaskExecutor,
+                                taskExecutor, /* gremlinTaskExecutor */
+                                ephemeralTaskExecutor,
+                                serverInfoDbExecutor);
+                this.schedulers.put(graph, scheduler);
+                break;
+            }
+            case "local":
+            default: {
+                TaskScheduler scheduler =
+                        new StandardTaskScheduler(
+                                graph,
+                                this.taskExecutor,
+                                this.taskDbExecutor,
+                                this.serverInfoDbExecutor);
+                this.schedulers.put(graph, scheduler);
+                break;
+            }
+        }
     }
 
     public void closeScheduler(HugeGraphParams graph) {
@@ -123,6 +169,10 @@ public final class TaskManager {
         if (!this.schedulerExecutor.isTerminated()) {
             this.closeSchedulerTx(graph);
         }
+
+        if (!this.distributedSchedulerExecutor.isTerminated()) {
+            this.closeDistributedSchedulerTx(graph);
+        }
     }
 
     private void closeTaskTx(HugeGraphParams graph) {
@@ -157,6 +207,21 @@ public final class TaskManager {
         }
     }
 
+    private void closeDistributedSchedulerTx(HugeGraphParams graph) {
+        final Callable<Void> closeTx = () -> {
+            // Do close-tx for current thread
+            graph.closeTx();
+            // Let other threads run
+            Thread.yield();
+            return null;
+        };
+        try {
+            this.distributedSchedulerExecutor.submit(closeTx).get();
+        } catch (Exception e) {
+            throw new HugeException("Exception when closing scheduler tx", e);
+        }
+    }
+
     public void pauseScheduledThreadPool() {
         this.schedulerExecutor.pauseSchedule();
     }
@@ -170,8 +235,7 @@ public final class TaskManager {
     }
 
     public ServerInfoManager getServerInfoManager(HugeGraphParams graph) {
-        StandardTaskScheduler scheduler = (StandardTaskScheduler)
-                this.getScheduler(graph);
+        TaskScheduler scheduler = this.getScheduler(graph);
         if (scheduler == null) {
             return null;
         }
@@ -195,10 +259,21 @@ public final class TaskManager {
             }
         }
 
+        if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
+            this.distributedSchedulerExecutor.shutdown();
+            try {
+                terminated = 
this.distributedSchedulerExecutor.awaitTermination(timeout,
+                                                                               
 unit);
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
         if (terminated && !this.taskExecutor.isShutdown()) {
             this.taskExecutor.shutdown();
             try {
-                terminated = this.taskExecutor.awaitTermination(timeout, unit);
+                terminated = this.taskExecutor.awaitTermination(timeout,
+                                                                unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -217,7 +292,38 @@ public final class TaskManager {
         if (terminated && !this.taskDbExecutor.isShutdown()) {
             this.taskDbExecutor.shutdown();
             try {
-                terminated = this.taskDbExecutor.awaitTermination(timeout, 
unit);
+                terminated = this.taskDbExecutor.awaitTermination(timeout,
+                                                                  unit);
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
+        if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
+            this.ephemeralTaskExecutor.shutdown();
+            try {
+                terminated = 
this.ephemeralTaskExecutor.awaitTermination(timeout,
+                                                                         unit);
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
+        if (terminated && !this.schemaTaskExecutor.isShutdown()) {
+            this.schemaTaskExecutor.shutdown();
+            try {
+                terminated = this.schemaTaskExecutor.awaitTermination(timeout,
+                                                                      unit);
+            } catch (Throwable e) {
+                ex = e;
+            }
+        }
+
+        if (terminated && !this.olapTaskExecutor.isShutdown()) {
+            this.olapTaskExecutor.shutdown();
+            try {
+                terminated = this.olapTaskExecutor.awaitTermination(timeout,
+                                                                    unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -292,7 +398,7 @@ public final class TaskManager {
         // Called by scheduler timer
         try {
             for (TaskScheduler entry : this.schedulers.values()) {
-                StandardTaskScheduler scheduler = (StandardTaskScheduler) 
entry;
+                TaskScheduler scheduler = entry;
                 // Maybe other thread close&remove scheduler at the same time
                 synchronized (scheduler) {
                     this.scheduleOrExecuteJobForGraph(scheduler);
@@ -303,56 +409,59 @@ public final class TaskManager {
         }
     }
 
-    private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler) 
{
+    private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) {
         E.checkNotNull(scheduler, "scheduler");
 
-        ServerInfoManager serverManager = scheduler.serverManager();
-        String graph = scheduler.graphName();
-
-        LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
-        try {
-            /*
-             * Skip if:
-             * graph is closed (iterate schedulers before graph is closing)
-             *  or
-             * graph is not initialized(maybe truncated or cleared).
-             *
-             * If graph is closing by other thread, current thread get
-             * serverManager and try lock graph, at the same time other
-             * thread deleted the lock-group, current thread would get
-             * exception 'LockGroup xx does not exists'.
-             * If graph is closed, don't call serverManager.initialized()
-             * due to it will reopen graph tx.
-             */
-            if (!serverManager.graphIsReady()) {
-                return;
-            }
-
-            // Update server heartbeat
-            serverManager.heartbeat();
+        if (scheduler instanceof StandardTaskScheduler) {
+            StandardTaskScheduler standardTaskScheduler = 
(StandardTaskScheduler) (scheduler);
+            ServerInfoManager serverManager = scheduler.serverManager();
+            String graph = scheduler.graphName();
 
-            /*
-             * Master will schedule tasks to suitable servers.
-             * Note a Worker may become to a Master, so elected-Master also 
needs to
-             * execute tasks assigned by previous Master when 
enableRoleElected=true.
-             * However, when enableRoleElected=false, a Master is only set by 
the
-             * config assignment, assigned-Master always stays the same state.
-             */
-            if (serverManager.selfIsMaster()) {
-                scheduler.scheduleTasksOnMaster();
-                if (!this.enableRoleElected && 
!serverManager.onlySingleNode()) {
-                    // assigned-Master + non-single-node don't need to execute 
tasks
+            LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
+            try {
+                /*
+                 * Skip if:
+                 * graph is closed (iterate schedulers before graph is closing)
+                 *  or
+                 * graph is not initialized(maybe truncated or cleared).
+                 *
+                 * If graph is closing by other thread, current thread get
+                 * serverManager and try lock graph, at the same time other
+                 * thread deleted the lock-group, current thread would get
+                 * exception 'LockGroup xx does not exists'.
+                 * If graph is closed, don't call serverManager.initialized()
+                 * due to it will reopen graph tx.
+                 */
+                if (!serverManager.graphIsReady()) {
                     return;
                 }
-            }
 
-            // Execute queued tasks scheduled to current server
-            scheduler.executeTasksOnWorker(serverManager.selfNodeId());
+                // Update server heartbeat
+                serverManager.heartbeat();
+
+                /*
+                 * Master will schedule tasks to suitable servers.
+                 * Note a Worker may become to a Master, so elected-Master 
also needs to
+                 * execute tasks assigned by previous Master when 
enableRoleElected=true.
+                 * However, when enableRoleElected=false, a Master is only set 
by the
+                 * config assignment, assigned-Master always stays the same 
state.
+                 */
+                if (serverManager.selfIsMaster()) {
+                    standardTaskScheduler.scheduleTasksOnMaster();
+                    if (!this.enableRoleElected && 
!serverManager.onlySingleNode()) {
+                        // assigned-Master + non-single-node don't need to 
execute tasks
+                        return;
+                    }
+                }
 
-            // Cancel tasks scheduled to current server
-            scheduler.cancelTasksOnWorker(serverManager.selfNodeId());
-        } finally {
-            LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
+                // Execute queued tasks scheduled to current server
+                
standardTaskScheduler.executeTasksOnWorker(serverManager.selfNodeId());
+
+                // Cancel tasks scheduled to current server
+                
standardTaskScheduler.cancelTasksOnWorker(serverManager.selfNodeId());
+            } finally {
+                LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
+            }
         }
     }
 

Reply via email to