This is an automated email from the ASF dual-hosted git repository.

pengjunzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new 0946d5dd0 feat(server): add option for task role election (#2843)
0946d5dd0 is described below

commit 0946d5dd0cad8c27cb365b0c2aaaf217cdca3402
Author: vaughn <[email protected]>
AuthorDate: Thu Jul 31 10:13:23 2025 +0800

    feat(server): add option for task role election (#2843)
---
 .../org/apache/hugegraph/config/ServerOptions.java |  9 +++
 .../org/apache/hugegraph/core/GraphManager.java    |  3 +-
 .../org/apache/hugegraph/task/TaskManager.java     | 87 +++++++++++-----------
 3 files changed, 55 insertions(+), 44 deletions(-)

diff --git 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java
 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java
index f4b259cc4..5041a90b3 100644
--- 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java
+++ 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java
@@ -73,6 +73,15 @@ public class ServerOptions extends OptionHolder {
                     "master"
             );
 
+    public static final ConfigOption<Boolean> ENABLE_SERVER_ROLE_ELECTION =
+            new ConfigOption<>(
+                    "server.role_election",
+                    "Whether to enable role election, if enabled, the server " 
+
+                    "will elect a master node in the cluster.",
+                    disallowEmpty(),
+                    false
+            );
+
     public static final ConfigOption<Integer> MAX_WORKER_THREADS =
             new ConfigOption<>(
                     "restserver.max_worker_threads",
diff --git 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
index daae180a3..80b52e124 100644
--- 
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
+++ 
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
@@ -469,7 +469,8 @@ public final class GraphManager {
 
         NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase());
         boolean supportRoleElection = !nodeRole.computer() &&
-                                      this.supportRoleElection();
+                                      this.supportRoleElection() &&
+                                      
config.get(ServerOptions.ENABLE_SERVER_ROLE_ELECTION);
         if (supportRoleElection) {
             // Init any server as Worker role, then do role election
             nodeRole = NodeRole.WORKER;
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
index f8e560204..a638a7940 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
@@ -37,6 +37,13 @@ import org.apache.hugegraph.util.LockUtil;
 import org.apache.hugegraph.util.Log;
 import org.slf4j.Logger;
 
+/**
+ * Central task management system that coordinates task scheduling and 
execution.
+ * Manages task schedulers for different graphs and handles role-based 
execution.
+ * <p>
+ * Note: The local master-worker mechanism will be deprecated in version 1.7
+ * (configuration has been removed from config files).
+ */
 public final class TaskManager {
 
     private static final Logger LOG = Log.logger(TaskManager.class);
@@ -44,8 +51,7 @@ public final class TaskManager {
     public static final String TASK_WORKER_PREFIX = "task-worker";
     public static final String TASK_WORKER = TASK_WORKER_PREFIX + "-%d";
     public static final String TASK_DB_WORKER = "task-db-worker-%d";
-    public static final String SERVER_INFO_DB_WORKER =
-            "server-info-db-worker-%d";
+    public static final String SERVER_INFO_DB_WORKER = 
"server-info-db-worker-%d";
     public static final String TASK_SCHEDULER = "task-scheduler-%d";
 
     public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
@@ -53,7 +59,7 @@ public final class TaskManager {
     public static final String EPHEMERAL_TASK_WORKER = 
"ephemeral-task-worker-%d";
     public static final String DISTRIBUTED_TASK_SCHEDULER = 
"distributed-scheduler-%d";
 
-    protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
+    static final long SCHEDULE_PERIOD = 1000L; // unit ms
     private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
     private static final int THREADS = 4;
     private static final TaskManager MANAGER = new TaskManager(THREADS);
@@ -87,17 +93,13 @@ public final class TaskManager {
         this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
                 1, SERVER_INFO_DB_WORKER);
 
-        this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
-                                                                  
SCHEMA_TASK_WORKER);
-        this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
-                                                                
OLAP_TASK_WORKER);
-        this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
-                                                                     
EPHEMERAL_TASK_WORKER);
+        this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, 
SCHEMA_TASK_WORKER);
+        this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, 
OLAP_TASK_WORKER);
+        this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool, 
EPHEMERAL_TASK_WORKER);
         this.distributedSchedulerExecutor =
-                ExecutorUtil.newPausableScheduledThreadPool(1,
-                                                            
DISTRIBUTED_TASK_SCHEDULER);
+                ExecutorUtil.newPausableScheduledThreadPool(1, 
DISTRIBUTED_TASK_SCHEDULER);
 
-        // For schedule task to run, just one thread is ok
+        // For a schedule task to run, just one thread is ok
         this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
                 1, TASK_SCHEDULER);
         // Start after 10x period time waiting for HugeGraphServer startup
@@ -111,7 +113,9 @@ public final class TaskManager {
         E.checkArgumentNotNull(graph, "The graph can't be null");
         LOG.info("Use {} as the scheduler of graph ({})",
                  graph.schedulerType(), graph.name());
-        // TODO: If the current service is bound to a specified non-DEFAULT 
graph space, the graph outside of the current graph space will no longer create 
task schedulers (graph space)
+        // TODO: If the current service is bound to a specified non-DEFAULT 
graph space, the
+        //  graph outside of the current graph space will no longer create 
task schedulers (graph
+        //  space)
         switch (graph.schedulerType()) {
             case "distributed": {
                 TaskScheduler scheduler =
@@ -194,7 +198,7 @@ public final class TaskManager {
 
     private void closeSchedulerTx(HugeGraphParams graph) {
         final Callable<Void> closeTx = () -> {
-            // Do close-tx for current thread
+            // Do close-tx for the current thread
             graph.closeTx();
             // Let other threads run
             Thread.yield();
@@ -209,7 +213,7 @@ public final class TaskManager {
 
     private void closeDistributedSchedulerTx(HugeGraphParams graph) {
         final Callable<Void> closeTx = () -> {
-            // Do close-tx for current thread
+            // Do close-tx for the current thread
             graph.closeTx();
             // Let other threads run
             Thread.yield();
@@ -252,8 +256,7 @@ public final class TaskManager {
         if (!this.schedulerExecutor.isShutdown()) {
             this.schedulerExecutor.shutdown();
             try {
-                terminated = this.schedulerExecutor.awaitTermination(timeout,
-                                                                     unit);
+                terminated = this.schedulerExecutor.awaitTermination(timeout, 
unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -262,8 +265,7 @@ public final class TaskManager {
         if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
             this.distributedSchedulerExecutor.shutdown();
             try {
-                terminated = 
this.distributedSchedulerExecutor.awaitTermination(timeout,
-                                                                               
 unit);
+                terminated = 
this.distributedSchedulerExecutor.awaitTermination(timeout, unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -272,8 +274,7 @@ public final class TaskManager {
         if (terminated && !this.taskExecutor.isShutdown()) {
             this.taskExecutor.shutdown();
             try {
-                terminated = this.taskExecutor.awaitTermination(timeout,
-                                                                unit);
+                terminated = this.taskExecutor.awaitTermination(timeout, unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -282,8 +283,7 @@ public final class TaskManager {
         if (terminated && !this.serverInfoDbExecutor.isShutdown()) {
             this.serverInfoDbExecutor.shutdown();
             try {
-                terminated = 
this.serverInfoDbExecutor.awaitTermination(timeout,
-                                                                        unit);
+                terminated = 
this.serverInfoDbExecutor.awaitTermination(timeout, unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -292,8 +292,7 @@ public final class TaskManager {
         if (terminated && !this.taskDbExecutor.isShutdown()) {
             this.taskDbExecutor.shutdown();
             try {
-                terminated = this.taskDbExecutor.awaitTermination(timeout,
-                                                                  unit);
+                terminated = this.taskDbExecutor.awaitTermination(timeout, 
unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -302,8 +301,7 @@ public final class TaskManager {
         if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
             this.ephemeralTaskExecutor.shutdown();
             try {
-                terminated = 
this.ephemeralTaskExecutor.awaitTermination(timeout,
-                                                                         unit);
+                terminated = 
this.ephemeralTaskExecutor.awaitTermination(timeout, unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -312,8 +310,7 @@ public final class TaskManager {
         if (terminated && !this.schemaTaskExecutor.isShutdown()) {
             this.schemaTaskExecutor.shutdown();
             try {
-                terminated = this.schemaTaskExecutor.awaitTermination(timeout,
-                                                                      unit);
+                terminated = this.schemaTaskExecutor.awaitTermination(timeout, 
unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -322,8 +319,7 @@ public final class TaskManager {
         if (terminated && !this.olapTaskExecutor.isShutdown()) {
             this.olapTaskExecutor.shutdown();
             try {
-                terminated = this.olapTaskExecutor.awaitTermination(timeout,
-                                                                    unit);
+                terminated = this.olapTaskExecutor.awaitTermination(timeout, 
unit);
             } catch (Throwable e) {
                 ex = e;
             }
@@ -356,9 +352,12 @@ public final class TaskManager {
     public void onAsRoleMaster() {
         try {
             for (TaskScheduler entry : this.schedulers.values()) {
-                StandardTaskScheduler scheduler = (StandardTaskScheduler) 
entry;
-                ServerInfoManager serverInfoManager = 
scheduler.serverManager();
-                serverInfoManager.changeServerRole(NodeRole.MASTER);
+                ServerInfoManager serverInfoManager = entry.serverManager();
+                if (serverInfoManager != null) {
+                    serverInfoManager.changeServerRole(NodeRole.MASTER);
+                } else {
+                    LOG.warn("ServerInfoManager is null for graph {}", 
entry.graphName());
+                }
             }
         } catch (Throwable e) {
             LOG.error("Exception occurred when change to master role", e);
@@ -369,9 +368,12 @@ public final class TaskManager {
     public void onAsRoleWorker() {
         try {
             for (TaskScheduler entry : this.schedulers.values()) {
-                StandardTaskScheduler scheduler = (StandardTaskScheduler) 
entry;
-                ServerInfoManager serverInfoManager = 
scheduler.serverManager();
-                serverInfoManager.changeServerRole(NodeRole.WORKER);
+                ServerInfoManager serverInfoManager = entry.serverManager();
+                if (serverInfoManager != null) {
+                    serverInfoManager.changeServerRole(NodeRole.WORKER);
+                } else {
+                    LOG.warn("ServerInfoManager is null for graph {}", 
entry.graphName());
+                }
             }
         } catch (Throwable e) {
             LOG.error("Exception occurred when change to worker role", e);
@@ -379,8 +381,8 @@ public final class TaskManager {
         }
     }
 
-    protected void notifyNewTask(HugeTask<?> task) {
-        Queue<Runnable> queue = ((ThreadPoolExecutor) this.schedulerExecutor)
+    void notifyNewTask(HugeTask<?> task) {
+        Queue<Runnable> queue = this.schedulerExecutor
                 .getQueue();
         if (queue.size() <= 1) {
             /*
@@ -398,10 +400,9 @@ public final class TaskManager {
         // Called by scheduler timer
         try {
             for (TaskScheduler entry : this.schedulers.values()) {
-                TaskScheduler scheduler = entry;
-                // Maybe other thread close&remove scheduler at the same time
-                synchronized (scheduler) {
-                    this.scheduleOrExecuteJobForGraph(scheduler);
+                // Maybe other threads close&remove scheduler at the same time
+                synchronized (entry) {
+                    this.scheduleOrExecuteJobForGraph(entry);
                 }
             }
         } catch (Throwable e) {

Reply via email to