This is an automated email from the ASF dual-hosted git repository.
jin pushed a commit to branch pd-store
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/pd-store by this push:
new 4d11dd5e0 fix(pd-store): reset previous unexpected cherry-pick for
`TaskManager` (#2511)
4d11dd5e0 is described below
commit 4d11dd5e057f9064116709f8c0f4d4ab640e751d
Author: V_Galaxy <[email protected]>
AuthorDate: Sun Apr 7 19:33:54 2024 +0800
fix(pd-store): reset previous unexpected cherry-pick for `TaskManager`
(#2511)
---
.../org/apache/hugegraph/task/TaskManager.java | 215 ++++++++++++++++-----
1 file changed, 162 insertions(+), 53 deletions(-)
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
index 144387949..afc5c9d49 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/TaskManager.java
@@ -48,6 +48,11 @@ public final class TaskManager {
"server-info-db-worker-%d";
public static final String TASK_SCHEDULER = "task-scheduler-%d";
+ public static final String OLAP_TASK_WORKER = "olap-task-worker-%d";
+ public static final String SCHEMA_TASK_WORKER = "schema-task-worker-%d";
+ public static final String EPHEMERAL_TASK_WORKER =
"ephemeral-task-worker-%d";
+ public static final String DISTRIBUTED_TASK_SCHEDULER =
"distributed-scheduler-%d";
+
protected static final long SCHEDULE_PERIOD = 1000L; // unit ms
private static final long TX_CLOSE_TIMEOUT = 30L; // unit s
private static final int THREADS = 4;
@@ -60,6 +65,11 @@ public final class TaskManager {
private final ExecutorService serverInfoDbExecutor;
private final PausableScheduledThreadPool schedulerExecutor;
+ private final ExecutorService schemaTaskExecutor;
+ private final ExecutorService olapTaskExecutor;
+ private final ExecutorService ephemeralTaskExecutor;
+ private final PausableScheduledThreadPool distributedSchedulerExecutor;
+
private boolean enableRoleElected = false;
public static TaskManager instance() {
@@ -76,6 +86,17 @@ public final class TaskManager {
1, TASK_DB_WORKER);
this.serverInfoDbExecutor = ExecutorUtil.newFixedThreadPool(
1, SERVER_INFO_DB_WORKER);
+
+ this.schemaTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+
SCHEMA_TASK_WORKER);
+ this.olapTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+
OLAP_TASK_WORKER);
+ this.ephemeralTaskExecutor = ExecutorUtil.newFixedThreadPool(pool,
+
EPHEMERAL_TASK_WORKER);
+ this.distributedSchedulerExecutor =
+ ExecutorUtil.newPausableScheduledThreadPool(1,
+
DISTRIBUTED_TASK_SCHEDULER);
+
// For schedule task to run, just one thread is ok
this.schedulerExecutor = ExecutorUtil.newPausableScheduledThreadPool(
1, TASK_SCHEDULER);
@@ -88,11 +109,36 @@ public final class TaskManager {
public void addScheduler(HugeGraphParams graph) {
E.checkArgumentNotNull(graph, "The graph can't be null");
-
- TaskScheduler scheduler = new StandardTaskScheduler(graph,
- this.taskExecutor,
this.taskDbExecutor,
-
this.serverInfoDbExecutor);
- this.schedulers.put(graph, scheduler);
+ LOG.info("Use {} as the scheduler of graph ({})",
+ graph.schedulerType(), graph.name());
+ // TODO: 如当前服务绑定到指定的非 DEFAULT 图空间,非当前图空间的图不再创建任务调度器 (graph space)
+ switch (graph.schedulerType()) {
+ case "distributed": {
+ TaskScheduler scheduler =
+ new DistributedTaskScheduler(
+ graph,
+ distributedSchedulerExecutor,
+ taskDbExecutor,
+ schemaTaskExecutor,
+ olapTaskExecutor,
+ taskExecutor, /* gremlinTaskExecutor */
+ ephemeralTaskExecutor,
+ serverInfoDbExecutor);
+ this.schedulers.put(graph, scheduler);
+ break;
+ }
+ case "local":
+ default: {
+ TaskScheduler scheduler =
+ new StandardTaskScheduler(
+ graph,
+ this.taskExecutor,
+ this.taskDbExecutor,
+ this.serverInfoDbExecutor);
+ this.schedulers.put(graph, scheduler);
+ break;
+ }
+ }
}
public void closeScheduler(HugeGraphParams graph) {
@@ -123,6 +169,10 @@ public final class TaskManager {
if (!this.schedulerExecutor.isTerminated()) {
this.closeSchedulerTx(graph);
}
+
+ if (!this.distributedSchedulerExecutor.isTerminated()) {
+ this.closeDistributedSchedulerTx(graph);
+ }
}
private void closeTaskTx(HugeGraphParams graph) {
@@ -157,6 +207,21 @@ public final class TaskManager {
}
}
+ private void closeDistributedSchedulerTx(HugeGraphParams graph) {
+ final Callable<Void> closeTx = () -> {
+ // Do close-tx for current thread
+ graph.closeTx();
+ // Let other threads run
+ Thread.yield();
+ return null;
+ };
+ try {
+ this.distributedSchedulerExecutor.submit(closeTx).get();
+ } catch (Exception e) {
+ throw new HugeException("Exception when closing scheduler tx", e);
+ }
+ }
+
public void pauseScheduledThreadPool() {
this.schedulerExecutor.pauseSchedule();
}
@@ -170,8 +235,7 @@ public final class TaskManager {
}
public ServerInfoManager getServerInfoManager(HugeGraphParams graph) {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
- this.getScheduler(graph);
+ TaskScheduler scheduler = this.getScheduler(graph);
if (scheduler == null) {
return null;
}
@@ -195,10 +259,21 @@ public final class TaskManager {
}
}
+ if (terminated && !this.distributedSchedulerExecutor.isShutdown()) {
+ this.distributedSchedulerExecutor.shutdown();
+ try {
+ terminated =
this.distributedSchedulerExecutor.awaitTermination(timeout,
+
unit);
+ } catch (Throwable e) {
+ ex = e;
+ }
+ }
+
if (terminated && !this.taskExecutor.isShutdown()) {
this.taskExecutor.shutdown();
try {
- terminated = this.taskExecutor.awaitTermination(timeout, unit);
+ terminated = this.taskExecutor.awaitTermination(timeout,
+ unit);
} catch (Throwable e) {
ex = e;
}
@@ -217,7 +292,38 @@ public final class TaskManager {
if (terminated && !this.taskDbExecutor.isShutdown()) {
this.taskDbExecutor.shutdown();
try {
- terminated = this.taskDbExecutor.awaitTermination(timeout,
unit);
+ terminated = this.taskDbExecutor.awaitTermination(timeout,
+ unit);
+ } catch (Throwable e) {
+ ex = e;
+ }
+ }
+
+ if (terminated && !this.ephemeralTaskExecutor.isShutdown()) {
+ this.ephemeralTaskExecutor.shutdown();
+ try {
+ terminated =
this.ephemeralTaskExecutor.awaitTermination(timeout,
+ unit);
+ } catch (Throwable e) {
+ ex = e;
+ }
+ }
+
+ if (terminated && !this.schemaTaskExecutor.isShutdown()) {
+ this.schemaTaskExecutor.shutdown();
+ try {
+ terminated = this.schemaTaskExecutor.awaitTermination(timeout,
+ unit);
+ } catch (Throwable e) {
+ ex = e;
+ }
+ }
+
+ if (terminated && !this.olapTaskExecutor.isShutdown()) {
+ this.olapTaskExecutor.shutdown();
+ try {
+ terminated = this.olapTaskExecutor.awaitTermination(timeout,
+ unit);
} catch (Throwable e) {
ex = e;
}
@@ -292,7 +398,7 @@ public final class TaskManager {
// Called by scheduler timer
try {
for (TaskScheduler entry : this.schedulers.values()) {
- StandardTaskScheduler scheduler = (StandardTaskScheduler)
entry;
+ TaskScheduler scheduler = entry;
// Maybe other thread close&remove scheduler at the same time
synchronized (scheduler) {
this.scheduleOrExecuteJobForGraph(scheduler);
@@ -303,56 +409,59 @@ public final class TaskManager {
}
}
- private void scheduleOrExecuteJobForGraph(StandardTaskScheduler scheduler)
{
+ private void scheduleOrExecuteJobForGraph(TaskScheduler scheduler) {
E.checkNotNull(scheduler, "scheduler");
- ServerInfoManager serverManager = scheduler.serverManager();
- String graph = scheduler.graphName();
-
- LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
- try {
- /*
- * Skip if:
- * graph is closed (iterate schedulers before graph is closing)
- * or
- * graph is not initialized(maybe truncated or cleared).
- *
- * If graph is closing by other thread, current thread get
- * serverManager and try lock graph, at the same time other
- * thread deleted the lock-group, current thread would get
- * exception 'LockGroup xx does not exists'.
- * If graph is closed, don't call serverManager.initialized()
- * due to it will reopen graph tx.
- */
- if (!serverManager.graphIsReady()) {
- return;
- }
-
- // Update server heartbeat
- serverManager.heartbeat();
+ if (scheduler instanceof StandardTaskScheduler) {
+ StandardTaskScheduler standardTaskScheduler =
(StandardTaskScheduler) (scheduler);
+ ServerInfoManager serverManager = scheduler.serverManager();
+ String graph = scheduler.graphName();
- /*
- * Master will schedule tasks to suitable servers.
- * Note a Worker may become to a Master, so elected-Master also
needs to
- * execute tasks assigned by previous Master when
enableRoleElected=true.
- * However, when enableRoleElected=false, a Master is only set by
the
- * config assignment, assigned-Master always stays the same state.
- */
- if (serverManager.selfIsMaster()) {
- scheduler.scheduleTasksOnMaster();
- if (!this.enableRoleElected &&
!serverManager.onlySingleNode()) {
- // assigned-Master + non-single-node don't need to execute
tasks
+ LockUtil.lock(graph, LockUtil.GRAPH_LOCK);
+ try {
+ /*
+ * Skip if:
+ * graph is closed (iterate schedulers before graph is closing)
+ * or
+ * graph is not initialized(maybe truncated or cleared).
+ *
+ * If graph is closing by other thread, current thread get
+ * serverManager and try lock graph, at the same time other
+ * thread deleted the lock-group, current thread would get
+ * exception 'LockGroup xx does not exists'.
+ * If graph is closed, don't call serverManager.initialized()
+ * due to it will reopen graph tx.
+ */
+ if (!serverManager.graphIsReady()) {
return;
}
- }
- // Execute queued tasks scheduled to current server
- scheduler.executeTasksOnWorker(serverManager.selfNodeId());
+ // Update server heartbeat
+ serverManager.heartbeat();
+
+ /*
+ * Master will schedule tasks to suitable servers.
+ * Note a Worker may become to a Master, so elected-Master
also needs to
+ * execute tasks assigned by previous Master when
enableRoleElected=true.
+ * However, when enableRoleElected=false, a Master is only set
by the
+ * config assignment, assigned-Master always stays the same
state.
+ */
+ if (serverManager.selfIsMaster()) {
+ standardTaskScheduler.scheduleTasksOnMaster();
+ if (!this.enableRoleElected &&
!serverManager.onlySingleNode()) {
+ // assigned-Master + non-single-node don't need to
execute tasks
+ return;
+ }
+ }
- // Cancel tasks scheduled to current server
- scheduler.cancelTasksOnWorker(serverManager.selfNodeId());
- } finally {
- LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
+ // Execute queued tasks scheduled to current server
+
standardTaskScheduler.executeTasksOnWorker(serverManager.selfNodeId());
+
+ // Cancel tasks scheduled to current server
+
standardTaskScheduler.cancelTasksOnWorker(serverManager.selfNodeId());
+ } finally {
+ LockUtil.unlock(graph, LockUtil.GRAPH_LOCK);
+ }
}
}