This is an automated email from the ASF dual-hosted git repository.
pengjunzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new 0946d5dd0 feat(server): add option for task role election (#2843)
0946d5dd0 is described below
commit 0946d5dd0cad8c27cb365b0c2aaaf217cdca3402
Author: vaughn <[email protected]>
AuthorDate: Thu Jul 31 10:13:23 2025 +0800
feat(server): add option for task role election (#2843)
---
.../org/apache/hugegraph/config/ServerOptions.java | 9 +++
.../org/apache/hugegraph/core/GraphManager.java | 3 +-
.../org/apache/hugegraph/task/TaskManager.java | 87 +++++++++++-----------
3 files changed, 55 insertions(+), 44 deletions(-)
diff --git
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java
index f4b259cc4..5041a90b3 100644
---
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java
+++
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/config/ServerOptions.java
@@ -73,6 +73,15 @@ public class ServerOptions extends OptionHolder {
"master"
);
+ public static final ConfigOption<Boolean> ENABLE_SERVER_ROLE_ELECTION =
+ new ConfigOption<>(
+ "server.role_election",
+ "Whether to enable role election, if enabled, the server "
+
+ "will elect a master node in the cluster.",
+ disallowEmpty(),
+ false
+ );
+
public static final ConfigOption<Integer> MAX_WORKER_THREADS =
new ConfigOption<>(
"restserver.max_worker_threads",
diff --git
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
index daae180a3..80b52e124 100644
---
a/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
+++
b/hugegraph-server/hugegraph-api/src/main/java/org/apache/hugegraph/core/GraphManager.java
@@ -469,7 +469,8 @@ public final class GraphManager {
NodeRole nodeRole = NodeRole.valueOf(role.toUpperCase());
boolean supportRoleElection = !nodeRole.computer() &&
- this.supportRoleElection();
+ this.supportRoleElection() &&
+
config.get(ServerOptions.ENABLE_SERVER_ROLE_ELECTION);
if (supportRoleElection) {
// Init any server as Worker role, then do role election
nodeRole = NodeRole.WORKER;
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
index f8e560204..a638a7940 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
@@ -37,6 +37,13 @@ import org.apache.hugegraph.util.LockUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;
+/**
+ * Central task management system that coordinates task scheduling and
execution.
+ * Manages task schedulers for different graphs and handles role-based
execution.
+ * <p>
+ * Note: The local master-worker mechanism will be deprecated in version 1.7
+ * (configuration has been removed from config files).
+ */
public final class TaskManager {
private static final Logger LOG = Log.logger(TaskManager.class);
@@ -44,8 +51,7 @@ public final class TaskManager {
public static final String TASK_WORKER_PREFIX = "task-worker";
public static final String TASK_WORKER = TASK_WORKER_PREFIX + "-%d";
public static final String TASK_DB_WORKER = "task-db-worker-%d";
- public static final String SERVER_INFO_DB_WORKER =
- "server-info-db-worker-%d";
+ public static final String SERVER_INFO_DB_WORKER =
"server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";
public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
@@ -53,7 +59,7 @@ public final class TaskManager {
public static final String EPHEMERAL_TASK_WORKER =
"ephemeral-task-worker-%d";
public static final String DISTRIBUTED_TASK_SCHEDULER =
"distributed-scheduler-%d";
- protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
+ static final long SCHEDULE_PERIOD = 1000L; // unit ms
private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
private static final int THREADS = 4;
private static final TaskManager MANAGER = new TaskManager(THREADS);
@@ -87,17 +93,13 @@ public final class TaskManager {
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
1, SERVER_INFO_DB_WORKER);
- this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
-
SCHEMA_TASK_WORKER);
- this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
-
OLAP_TASK_WORKER);
- this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
-
EPHEMERAL_TASK_WORKER);
+ this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
SCHEMA_TASK_WORKER);
+ this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
OLAP_TASK_WORKER);
+ this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
EPHEMERAL_TASK_WORKER);
this.distributedSchedulerExecutor =
- ExecutorUtil.newPausableScheduledThreadPool(1,
-
DISTRIBUTED_TASK_SCHEDULER);
+ ExecutorUtil.newPausableScheduledThreadPool(1,
DISTRIBUTED_TASK_SCHEDULER);
- // For schedule task to run, just one thread is ok
+ // For a schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
// Start after 10x period time waiting for HugeGraphServer startup
@@ -111,7 +113,9 @@ public final class TaskManager {
E.checkArgumentNotNull(graph, "The graph can't be null");
LOG.info("Use {} as the scheduler of graph ({})",
graph.schedulerType(), graph.name());
- // TODO: If the current service is bound to a specified non-DEFAULT
graph space, the graph outside of the current graph space will no longer create
task schedulers (graph space)
+ // TODO: If the current service is bound to a specified non-DEFAULT
graph space, the
+ // graph outside of the current graph space will no longer create
task schedulers (graph
+ // space)
switch (graph.schedulerType()) {
case "distributed": {
TaskScheduler scheduler =
@@ -194,7 +198,7 @@ public final class TaskManager {
private void closeSchedulerTx(HugeGraphParams graph) {
final Callable<Void> closeTx = () -> {
- // Do close-tx for current thread
+ // Do close-tx for the current thread
graph.closeTx();
// Let other threads run
Thread.yield();
@@ -209,7 +213,7 @@ public final class TaskManager {
private void closeDistributedSchedulerTx(HugeGraphParams graph) {
final Callable<Void> closeTx = () -> {
- // Do close-tx for current thread
+ // Do close-tx for the current thread
graph.closeTx();
// Let other threads run
Thread.yield();
@@ -252,8 +256,7 @@ public final class TaskManager {
if (!this.schedulerExecutor.isShutdown()) {
this.schedulerExecutor.shutdown();
try {
- terminated = this.schedulerExecutor.awaitTermination(timeout,
- unit);
+ terminated = this.schedulerExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
@@ -262,8 +265,7 @@ public final class TaskManager {
if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
this.distributedSchedulerExecutor.shutdown();
try {
- terminated =
this.distributedSchedulerExecutor.awaitTermination(timeout,
-
unit);
+ terminated =
this.distributedSchedulerExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
@@ -272,8 +274,7 @@ public final class TaskManager {
if (terminated && !this.taskExecutor.isShutdown()) {
this.taskExecutor.shutdown();
try {
- terminated = this.taskExecutor.awaitTermination(timeout,
- unit);
+ terminated = this.taskExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
@@ -282,8 +283,7 @@ public final class TaskManager {
if (terminated && !this.serverInfoDbExecutor.isShutdown()) {
this.serverInfoDbExecutor.shutdown();
try {
- terminated =
this.serverInfoDbExecutor.awaitTermination(timeout,
- unit);
+ terminated =
this.serverInfoDbExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
@@ -292,8 +292,7 @@ public final class TaskManager {
if (terminated && !this.taskDbExecutor.isShutdown()) {
this.taskDbExecutor.shutdown();
try {
- terminated = this.taskDbExecutor.awaitTermination(timeout,
- unit);
+ terminated = this.taskDbExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
@@ -302,8 +301,7 @@ public final class TaskManager {
if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
this.ephemeralTaskExecutor.shutdown();
try {
- terminated =
this.ephemeralTaskExecutor.awaitTermination(timeout,
- unit);
+ terminated =
this.ephemeralTaskExecutor.awaitTermination(timeout, unit);
} catch (Throwable e) {
ex = e;
}
@@ -312,8 +310,7 @@ public final class TaskManager {
if (terminated && !this.schemaTaskExecutor.isShutdown()) {
this.schemaTaskExecutor.shutdown();
try {
- terminated = this.schemaTaskExecutor.awaitTermination(timeout,
- unit);
+ terminated = this.schemaTaskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
@@ -322,8 +319,7 @@ public final class TaskManager {
if (terminated && !this.olapTaskExecutor.isShutdown()) {
this.olapTaskExecutor.shutdown();
try {
- terminated = this.olapTaskExecutor.awaitTermination(timeout,
- unit);
+ terminated = this.olapTaskExecutor.awaitTermination(timeout,
unit);
} catch (Throwable e) {
ex = e;
}
@@ -356,9 +352,12 @@ public final class TaskManager {
public void onAsRoleMaster() {
try {
for (TaskScheduler entry : this.schedulers.values()) {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
entry;
- ServerInfoManager serverInfoManager =
scheduler.serverManager();
- serverInfoManager.changeServerRole(NodeRole.MASTER);
+ ServerInfoManager serverInfoManager = entry.serverManager();
+ if (serverInfoManager != null) {
+ serverInfoManager.changeServerRole(NodeRole.MASTER);
+ } else {
+ LOG.warn("ServerInfoManager is null for graph {}",
entry.graphName());
+ }
}
} catch (Throwable e) {
LOG.error("Exception occurred when change to master role", e);
@@ -369,9 +368,12 @@ public final class TaskManager {
public void onAsRoleWorker() {
try {
for (TaskScheduler entry : this.schedulers.values()) {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
entry;
- ServerInfoManager serverInfoManager =
scheduler.serverManager();
- serverInfoManager.changeServerRole(NodeRole.WORKER);
+ ServerInfoManager serverInfoManager = entry.serverManager();
+ if (serverInfoManager != null) {
+ serverInfoManager.changeServerRole(NodeRole.WORKER);
+ } else {
+ LOG.warn("ServerInfoManager is null for graph {}",
entry.graphName());
+ }
}
} catch (Throwable e) {
LOG.error("Exception occurred when change to worker role", e);
@@ -379,8 +381,8 @@ public final class TaskManager {
}
}
- protected void notifyNewTask(HugeTask<?> task) {
- Queue<Runnable> queue = ((ThreadPoolExecutor) this.schedulerExecutor)
+ void notifyNewTask(HugeTask<?> task) {
+ Queue<Runnable> queue = this.schedulerExecutor
.getQueue();
if (queue.size() <= 1) {
/*
@@ -398,10 +400,9 @@ public final class TaskManager {
// Called by scheduler timer
try {
for (TaskScheduler entry : this.schedulers.values()) {
- TaskScheduler scheduler = entry;
- // Maybe other thread close&remove scheduler at the same time
- synchronized (scheduler) {
- this.scheduleOrExecuteJobForGraph(scheduler);
+ // Maybe other threads close&remove scheduler at the same time
+ synchronized (entry) {
+ this.scheduleOrExecuteJobForGraph(entry);
}
}
} catch (Throwable e) {