This is an automated email from the ASF dual-hosted git repository.
vgalaxies pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git
The following commit(s) were added to refs/heads/master by this push:
new 861a10083 refactor(server): optimize the server-node info (#2671)
861a10083 is described below
commit 861a10083c13853d20c2ac43e4fe129c1c3bb5e6
Author: vaughn <[email protected]>
AuthorDate: Thu Oct 10 20:17:43 2024 +0800
refactor(server): optimize the server-node info (#2671)
Co-authored-by: imbajin <[email protected]>
---
.../org/apache/hugegraph/StandardHugeGraph.java | 7 ++--
.../org/apache/hugegraph/task/HugeServerInfo.java | 27 +++++++-------
.../apache/hugegraph/task/ServerInfoManager.java | 36 ++++++++++---------
.../hugegraph/task/StandardTaskScheduler.java | 41 +++++++++-------------
4 files changed, 52 insertions(+), 59 deletions(-)
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index 4e9263a48..6480d7f28 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -316,8 +316,7 @@ public class StandardHugeGraph implements HugeGraph {
conf.get(
RoleElectionOptions.BASE_TIMEOUT_MILLISECOND));
ClusterRoleStore roleStore = new StandardClusterRoleStore(this.params);
- this.roleElectionStateMachine = new
StandardRoleElectionStateMachine(roleConfig,
-
roleStore);
+ this.roleElectionStateMachine = new
StandardRoleElectionStateMachine(roleConfig, roleStore);
}
@Override
@@ -1007,7 +1006,7 @@ public class StandardHugeGraph implements HugeGraph {
this.initBackend();
this.serverStarted(nodeInfo);
- // Write config to disk file
+ // Write config to the disk file
String confPath = ConfigUtil.writeToFile(configPath, this.name(),
this.configuration());
this.configuration.file(confPath);
@@ -1349,7 +1348,7 @@ public class StandardHugeGraph implements HugeGraph {
private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
- // Times opened from upper layer
+ // Times opened from the upper layer
private final AtomicInteger refs;
// Flag opened of each thread
private final ThreadLocal<Boolean> opened;
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java
index a304b4f75..71feb3f68 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java
@@ -20,7 +20,6 @@ package org.apache.hugegraph.task;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -47,14 +46,13 @@ import
org.apache.tinkerpop.gremlin.structure.VertexProperty;
public class HugeServerInfo {
// Unit millisecond
- private static final long EXPIRED_INTERVAL =
- TaskManager.SCHEDULE_PERIOD * 10;
+ private static final long EXPIRED_INTERVAL = TaskManager.SCHEDULE_PERIOD *
10;
- private Id id;
private NodeRole role;
+ private Date updateTime;
private int maxLoad;
private int load;
- private Date updateTime;
+ private final Id id;
private transient boolean updated = false;
@@ -114,6 +112,10 @@ public class HugeServerInfo {
this.updated = true;
}
+ public long expireTime() {
+ return this.updateTime.getTime() + EXPIRED_INTERVAL;
+ }
+
public Date updateTime() {
return this.updateTime;
}
@@ -200,8 +202,7 @@ public class HugeServerInfo {
public static HugeServerInfo fromVertex(Vertex vertex) {
HugeServerInfo serverInfo = new HugeServerInfo((Id) vertex.id());
- for (Iterator<VertexProperty<Object>> iter = vertex.properties();
- iter.hasNext(); ) {
+ for (var iter = vertex.properties(); iter.hasNext(); ) {
VertexProperty<Object> prop = iter.next();
serverInfo.property(prop.key(), prop.value());
}
@@ -246,7 +247,7 @@ public class HugeServerInfo {
public static final String SERVER = P.SERVER;
- protected final HugeGraphParams graph;
+ private final HugeGraphParams graph;
public Schema(HugeGraphParams graph) {
this.graph = graph;
@@ -264,8 +265,7 @@ public class HugeServerInfo {
VertexLabel label = graph.schema().vertexLabel(SERVER)
.properties(properties)
.useCustomizeStringId()
- .nullableKeys(P.ROLE, P.MAX_LOAD,
- P.LOAD, P.UPDATE_TIME)
+ .nullableKeys(P.ROLE, P.MAX_LOAD, P.LOAD,
P.UPDATE_TIME)
.enableLabelIndex(true)
.build();
this.graph.schemaTransaction().addVertexLabel(label);
@@ -273,7 +273,6 @@ public class HugeServerInfo {
private String[] initProperties() {
List<String> props = new ArrayList<>();
-
props.add(createPropertyKey(P.ROLE, DataType.BYTE));
props.add(createPropertyKey(P.MAX_LOAD, DataType.INT));
props.add(createPropertyKey(P.LOAD, DataType.INT));
@@ -283,8 +282,7 @@ public class HugeServerInfo {
}
public boolean existVertexLabel(String label) {
- return this.graph.schemaTransaction()
- .getVertexLabel(label) != null;
+ return this.graph.schemaTransaction().getVertexLabel(label) !=
null;
}
@SuppressWarnings("unused")
@@ -296,8 +294,7 @@ public class HugeServerInfo {
return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
}
- private String createPropertyKey(String name, DataType dataType,
- Cardinality cardinality) {
+ private String createPropertyKey(String name, DataType dataType,
Cardinality cardinality) {
SchemaManager schema = this.graph.graph().schema();
PropertyKey propertyKey = schema.propertyKey(name)
.dataType(dataType)
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java
index de0d08b03..bcef86901 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java
@@ -67,8 +67,7 @@ public class ServerInfoManager {
private volatile boolean onlySingleNode;
private volatile boolean closed;
- public ServerInfoManager(HugeGraphParams graph,
- ExecutorService dbExecutor) {
+ public ServerInfoManager(HugeGraphParams graph, ExecutorService
dbExecutor) {
E.checkNotNull(graph, "graph");
E.checkNotNull(dbExecutor, "db executor");
@@ -107,9 +106,20 @@ public class ServerInfoManager {
Id serverId = nodeInfo.nodeId();
HugeServerInfo existed = this.serverInfo(serverId);
+ if (existed != null && existed.alive()) {
+ final long now = DateUtil.now().getTime();
+ if (existed.expireTime() > now + 30 * 1000) {
+ LOG.info("The node time maybe skew very much: {}", existed);
+ throw new HugeException("The server with name '%s' maybe skew
very much", serverId);
+ }
+ try {
+ Thread.sleep(existed.expireTime() - now + 1);
+ } catch (InterruptedException e) {
+ throw new HugeException("Interrupted when waiting for server
info expired", e);
+ }
+ }
E.checkArgument(existed == null || !existed.alive(),
- "The server with name '%s' already in cluster",
- serverId);
+ "The server with name '%s' already in cluster",
serverId);
if (nodeInfo.nodeRole().master()) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
@@ -185,13 +195,12 @@ public class ServerInfoManager {
/* ServerInfo is missing */
if (this.selfNodeId() == null) {
// Ignore if ServerInfo is not initialized
- LOG.info("ServerInfo is missing: {}, may not be initialized yet");
+ LOG.info("ServerInfo is missing: {}, may not be initialized yet",
this.selfNodeId());
return;
}
if (this.selfIsMaster()) {
- // On master node, just wait for ServerInfo re-init
- LOG.warn("ServerInfo is missing: {}, may be cleared before",
- this.selfNodeId());
+ // On the master node, just wait for ServerInfo re-init
+ LOG.warn("ServerInfo is missing: {}, may be cleared before",
this.selfNodeId());
return;
}
/*
@@ -232,12 +241,10 @@ public class ServerInfoManager {
if (!server.alive()) {
continue;
}
-
if (server.role().master()) {
master = server;
continue;
}
-
hasWorkerNode = true;
if (!server.suitableFor(task, now)) {
continue;
@@ -254,13 +261,12 @@ public class ServerInfoManager {
this.onlySingleNode = singleNode;
}
- // Only schedule to master if there is no workers and master is
suitable
+ // Only schedule to master if there are no workers and master are
suitable
if (!hasWorkerNode) {
if (master != null && master.suitableFor(task, now)) {
serverWithMinLoad = master;
}
}
-
return serverWithMinLoad;
}
@@ -286,8 +292,7 @@ public class ServerInfoManager {
throw new HugeException("Schema is missing for %s '%s'",
HugeServerInfo.P.SERVER, serverInfo);
}
- HugeVertex vertex = this.tx().constructVertex(false,
-
serverInfo.asArray());
+ HugeVertex vertex = this.tx().constructVertex(false,
serverInfo.asArray());
// Add or update server info in backend store
vertex = this.tx().addVertex(vertex);
return vertex.id();
@@ -301,8 +306,7 @@ public class ServerInfoManager {
}
HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
- throw new HugeException("Schema is missing for %s",
- HugeServerInfo.P.SERVER);
+ throw new HugeException("Schema is missing for %s",
HugeServerInfo.P.SERVER);
}
// Save server info in batch
GraphTransaction tx = this.tx();
diff --git
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
index 8afe11dff..139588861 100644
---
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
+++
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
@@ -120,7 +120,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
if (this.taskTx == null) {
BackendStore store = this.graph.loadSystemStore();
TaskTransaction tx = new TaskTransaction(this.graph,
store);
- assert this.taskTx == null; // may be reentrant?
+ assert this.taskTx == null; // maybe reentrant?
this.taskTx = tx;
}
}
@@ -196,7 +196,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
if (this.serverManager().onlySingleNode() && !task.computer()) {
/*
- * Speed up for single node, submit task immediately,
+ * Speed up for single node, submit the task immediately,
* this code can be removed without affecting code logic
*/
task.status(TaskStatus.QUEUED);
@@ -205,7 +205,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
return this.submitTask(task);
} else {
/*
- * Just set SCHEDULING status and save task,
+ * Just set the SCHEDULING status and save the task,
* it will be scheduled by periodic scheduler worker
*/
task.status(TaskStatus.SCHEDULING);
@@ -276,11 +276,11 @@ public class StandardTaskScheduler implements
TaskScheduler {
assert this.serverManager().selfIsMaster();
if (!task.server().equals(this.serverManager().selfNodeId())) {
/*
- * Remove task from memory if it's running on worker node,
- * but keep task in memory if it's running on master node.
- * cancel-scheduling will read task from backend store, if
+ * Remove the task from memory if it's running on worker node,
+ * but keep the task in memory if it's running on master node.
+ * Cancel-scheduling will read the task from backend store, if
* removed this instance from memory, there will be two task
- * instances with same id, and can't cancel the real task that
+ * instances with the same id, and can't cancel the real task
that
* is running but removed from memory.
*/
this.remove(task);
@@ -301,12 +301,10 @@ public class StandardTaskScheduler implements
TaskScheduler {
protected synchronized void scheduleTasksOnMaster() {
// Master server schedule all scheduling tasks to suitable worker nodes
- Collection<HugeServerInfo> serverInfos = this.serverManager()
- .allServerInfos();
+ Collection<HugeServerInfo> serverInfos =
this.serverManager().allServerInfos();
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
- Iterator<HugeTask<Object>> tasks =
this.tasks(TaskStatus.SCHEDULING,
- PAGE_SIZE, page);
+ Iterator<HugeTask<Object>> tasks =
this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
if (task.server() != null) {
@@ -318,12 +316,10 @@ public class StandardTaskScheduler implements
TaskScheduler {
return;
}
- HugeServerInfo server = this.serverManager().pickWorkerNode(
- serverInfos, task);
+ HugeServerInfo server =
this.serverManager().pickWorkerNode(serverInfos, task);
if (server == null) {
LOG.info("The master can't find suitable servers to " +
- "execute task '{}', wait for next schedule",
- task.id());
+ "execute task '{}', wait for next schedule",
task.id());
continue;
}
@@ -336,8 +332,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
// Update server load in memory, it will be saved at the ending
server.increaseLoad(task.load());
- LOG.info("Scheduled task '{}' to server '{}'",
- task.id(), server.id());
+ LOG.info("Scheduled task '{}' to server '{}'", task.id(),
server.id());
}
if (page != null) {
page = PageInfo.pageInfo(tasks);
@@ -351,8 +346,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
protected void executeTasksOnWorker(Id server) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
- Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED,
- PAGE_SIZE, page);
+ Iterator<HugeTask<Object>> tasks =
this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
this.initTaskCallable(task);
@@ -381,8 +375,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
protected void cancelTasksOnWorker(Id server) {
String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
do {
- Iterator<HugeTask<Object>> tasks =
this.tasks(TaskStatus.CANCELLING,
- PAGE_SIZE, page);
+ Iterator<HugeTask<Object>> tasks =
this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page);
while (tasks.hasNext()) {
HugeTask<?> task = tasks.next();
Id taskServer = task.server();
@@ -557,10 +550,10 @@ public class StandardTaskScheduler implements
TaskScheduler {
HugeTask<?> task = this.task(id);
/*
- * The following is out of date when task running on worker node:
+ * The following is out of date when the task running on worker node:
* HugeTask<?> task = this.tasks.get(id);
* Tasks are removed from memory after completed at most time,
- * but there is a tiny gap between tasks are completed and
+ * but there is a tiny gap between tasks is completed and
* removed from memory.
* We assume tasks only in memory may be incomplete status,
* in fact, it is also possible to appear on the backend tasks
@@ -621,7 +614,7 @@ public class StandardTaskScheduler implements TaskScheduler
{
throw e;
}
if (task.completed()) {
- // Wait for task result being set after status is completed
+ // Wait for the task result being set after the status is
completed
sleep(intervalMs);
return task;
}