This is an automated email from the ASF dual-hosted git repository.

vgalaxies pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hugegraph.git


The following commit(s) were added to refs/heads/master by this push:
     new 861a10083 refactor(server): optimize the server-node info (#2671)
861a10083 is described below

commit 861a10083c13853d20c2ac43e4fe129c1c3bb5e6
Author: vaughn <[email protected]>
AuthorDate: Thu Oct 10 20:17:43 2024 +0800

    refactor(server): optimize the server-node info (#2671)
    
    Co-authored-by: imbajin <[email protected]>
---
 .../org/apache/hugegraph/StandardHugeGraph.java    |  7 ++--
 .../org/apache/hugegraph/task/HugeServerInfo.java  | 27 +++++++-------
 .../apache/hugegraph/task/ServerInfoManager.java   | 36 ++++++++++---------
 .../hugegraph/task/StandardTaskScheduler.java      | 41 +++++++++-------------
 4 files changed, 52 insertions(+), 59 deletions(-)

diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
index 4e9263a48..6480d7f28 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/StandardHugeGraph.java
@@ -316,8 +316,7 @@ public class StandardHugeGraph implements HugeGraph {
                                                    conf.get(
                                                            
RoleElectionOptions.BASE_TIMEOUT_MILLISECOND));
         ClusterRoleStore roleStore = new StandardClusterRoleStore(this.params);
-        this.roleElectionStateMachine = new 
StandardRoleElectionStateMachine(roleConfig,
-                                                                             
roleStore);
+        this.roleElectionStateMachine = new 
StandardRoleElectionStateMachine(roleConfig, roleStore);
     }
 
     @Override
@@ -1007,7 +1006,7 @@ public class StandardHugeGraph implements HugeGraph {
         this.initBackend();
         this.serverStarted(nodeInfo);
 
-        // Write config to disk file
+        // Write config to the disk file
         String confPath = ConfigUtil.writeToFile(configPath, this.name(),
                                                  this.configuration());
         this.configuration.file(confPath);
@@ -1349,7 +1348,7 @@ public class StandardHugeGraph implements HugeGraph {
 
     private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
 
-        // Times opened from upper layer
+        // Times opened from the upper layer
         private final AtomicInteger refs;
         // Flag opened of each thread
         private final ThreadLocal<Boolean> opened;
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java
index a304b4f75..71feb3f68 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/HugeServerInfo.java
@@ -20,7 +20,6 @@ package org.apache.hugegraph.task;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -47,14 +46,13 @@ import 
org.apache.tinkerpop.gremlin.structure.VertexProperty;
 public class HugeServerInfo {
 
     // Unit millisecond
-    private static final long EXPIRED_INTERVAL =
-            TaskManager.SCHEDULE_PERIOD * 10;
+    private static final long EXPIRED_INTERVAL = TaskManager.SCHEDULE_PERIOD * 
10;
 
-    private Id id;
     private NodeRole role;
+    private Date updateTime;
     private int maxLoad;
     private int load;
-    private Date updateTime;
+    private final Id id;
 
     private transient boolean updated = false;
 
@@ -114,6 +112,10 @@ public class HugeServerInfo {
         this.updated = true;
     }
 
+    public long expireTime() {
+        return this.updateTime.getTime() + EXPIRED_INTERVAL;
+    }
+
     public Date updateTime() {
         return this.updateTime;
     }
@@ -200,8 +202,7 @@ public class HugeServerInfo {
 
     public static HugeServerInfo fromVertex(Vertex vertex) {
         HugeServerInfo serverInfo = new HugeServerInfo((Id) vertex.id());
-        for (Iterator<VertexProperty<Object>> iter = vertex.properties();
-             iter.hasNext(); ) {
+        for (var iter = vertex.properties(); iter.hasNext(); ) {
             VertexProperty<Object> prop = iter.next();
             serverInfo.property(prop.key(), prop.value());
         }
@@ -246,7 +247,7 @@ public class HugeServerInfo {
 
         public static final String SERVER = P.SERVER;
 
-        protected final HugeGraphParams graph;
+        private final HugeGraphParams graph;
 
         public Schema(HugeGraphParams graph) {
             this.graph = graph;
@@ -264,8 +265,7 @@ public class HugeServerInfo {
             VertexLabel label = graph.schema().vertexLabel(SERVER)
                                      .properties(properties)
                                      .useCustomizeStringId()
-                                     .nullableKeys(P.ROLE, P.MAX_LOAD,
-                                                   P.LOAD, P.UPDATE_TIME)
+                                     .nullableKeys(P.ROLE, P.MAX_LOAD, P.LOAD, 
P.UPDATE_TIME)
                                      .enableLabelIndex(true)
                                      .build();
             this.graph.schemaTransaction().addVertexLabel(label);
@@ -273,7 +273,6 @@ public class HugeServerInfo {
 
         private String[] initProperties() {
             List<String> props = new ArrayList<>();
-
             props.add(createPropertyKey(P.ROLE, DataType.BYTE));
             props.add(createPropertyKey(P.MAX_LOAD, DataType.INT));
             props.add(createPropertyKey(P.LOAD, DataType.INT));
@@ -283,8 +282,7 @@ public class HugeServerInfo {
         }
 
         public boolean existVertexLabel(String label) {
-            return this.graph.schemaTransaction()
-                             .getVertexLabel(label) != null;
+            return this.graph.schemaTransaction().getVertexLabel(label) != 
null;
         }
 
         @SuppressWarnings("unused")
@@ -296,8 +294,7 @@ public class HugeServerInfo {
             return this.createPropertyKey(name, dataType, Cardinality.SINGLE);
         }
 
-        private String createPropertyKey(String name, DataType dataType,
-                                         Cardinality cardinality) {
+        private String createPropertyKey(String name, DataType dataType, 
Cardinality cardinality) {
             SchemaManager schema = this.graph.graph().schema();
             PropertyKey propertyKey = schema.propertyKey(name)
                                             .dataType(dataType)
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java
index de0d08b03..bcef86901 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/ServerInfoManager.java
@@ -67,8 +67,7 @@ public class ServerInfoManager {
     private volatile boolean onlySingleNode;
     private volatile boolean closed;
 
-    public ServerInfoManager(HugeGraphParams graph,
-                             ExecutorService dbExecutor) {
+    public ServerInfoManager(HugeGraphParams graph, ExecutorService 
dbExecutor) {
         E.checkNotNull(graph, "graph");
         E.checkNotNull(dbExecutor, "db executor");
 
@@ -107,9 +106,20 @@ public class ServerInfoManager {
 
         Id serverId = nodeInfo.nodeId();
         HugeServerInfo existed = this.serverInfo(serverId);
+        if (existed != null && existed.alive()) {
+            final long now = DateUtil.now().getTime();
+            if (existed.expireTime() > now + 30 * 1000) {
+                LOG.info("The node time maybe skew very much: {}", existed);
+                throw new HugeException("The server with name '%s' maybe skew 
very much", serverId);
+            }
+            try {
+                Thread.sleep(existed.expireTime() - now + 1);
+            } catch (InterruptedException e) {
+               throw new HugeException("Interrupted when waiting for server 
info expired", e);
+            }
+        }
         E.checkArgument(existed == null || !existed.alive(),
-                        "The server with name '%s' already in cluster",
-                        serverId);
+                        "The server with name '%s' already in cluster", 
serverId);
 
         if (nodeInfo.nodeRole().master()) {
             String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
@@ -185,13 +195,12 @@ public class ServerInfoManager {
         /* ServerInfo is missing */
         if (this.selfNodeId() == null) {
             // Ignore if ServerInfo is not initialized
-            LOG.info("ServerInfo is missing: {}, may not be initialized yet");
+            LOG.info("ServerInfo is missing: {}, may not be initialized yet", 
this.selfNodeId());
             return;
         }
         if (this.selfIsMaster()) {
-            // On master node, just wait for ServerInfo re-init
-            LOG.warn("ServerInfo is missing: {}, may be cleared before",
-                     this.selfNodeId());
+            // On the master node, just wait for ServerInfo re-init
+            LOG.warn("ServerInfo is missing: {}, may be cleared before", 
this.selfNodeId());
             return;
         }
         /*
@@ -232,12 +241,10 @@ public class ServerInfoManager {
             if (!server.alive()) {
                 continue;
             }
-
             if (server.role().master()) {
                 master = server;
                 continue;
             }
-
             hasWorkerNode = true;
             if (!server.suitableFor(task, now)) {
                 continue;
@@ -254,13 +261,12 @@ public class ServerInfoManager {
             this.onlySingleNode = singleNode;
         }
 
-        // Only schedule to master if there is no workers and master is 
suitable
+        // Only schedule to master if there are no workers and master are 
suitable
         if (!hasWorkerNode) {
             if (master != null && master.suitableFor(task, now)) {
                 serverWithMinLoad = master;
             }
         }
-
         return serverWithMinLoad;
     }
 
@@ -286,8 +292,7 @@ public class ServerInfoManager {
                 throw new HugeException("Schema is missing for %s '%s'",
                                         HugeServerInfo.P.SERVER, serverInfo);
             }
-            HugeVertex vertex = this.tx().constructVertex(false,
-                                                          
serverInfo.asArray());
+            HugeVertex vertex = this.tx().constructVertex(false, 
serverInfo.asArray());
             // Add or update server info in backend store
             vertex = this.tx().addVertex(vertex);
             return vertex.id();
@@ -301,8 +306,7 @@ public class ServerInfoManager {
             }
             HugeServerInfo.Schema schema = HugeServerInfo.schema(this.graph);
             if (!schema.existVertexLabel(HugeServerInfo.P.SERVER)) {
-                throw new HugeException("Schema is missing for %s",
-                                        HugeServerInfo.P.SERVER);
+                throw new HugeException("Schema is missing for %s", 
HugeServerInfo.P.SERVER);
             }
             // Save server info in batch
             GraphTransaction tx = this.tx();
diff --git 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
index 8afe11dff..139588861 100644
--- 
a/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
+++ 
b/hugegraph-server/hugegraph-core/src/main/java/org/apache/hugegraph/task/StandardTaskScheduler.java
@@ -120,7 +120,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
                 if (this.taskTx == null) {
                     BackendStore store = this.graph.loadSystemStore();
                     TaskTransaction tx = new TaskTransaction(this.graph, 
store);
-                    assert this.taskTx == null; // may be reentrant?
+                    assert this.taskTx == null; // maybe reentrant?
                     this.taskTx = tx;
                 }
             }
@@ -196,7 +196,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
 
         if (this.serverManager().onlySingleNode() && !task.computer()) {
             /*
-             * Speed up for single node, submit task immediately,
+             * Speed up for single node, submit the task immediately,
              * this code can be removed without affecting code logic
              */
             task.status(TaskStatus.QUEUED);
@@ -205,7 +205,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
             return this.submitTask(task);
         } else {
             /*
-             * Just set SCHEDULING status and save task,
+             * Just set the SCHEDULING status and save the task,
              * it will be scheduled by periodic scheduler worker
              */
             task.status(TaskStatus.SCHEDULING);
@@ -276,11 +276,11 @@ public class StandardTaskScheduler implements 
TaskScheduler {
             assert this.serverManager().selfIsMaster();
             if (!task.server().equals(this.serverManager().selfNodeId())) {
                 /*
-                 * Remove task from memory if it's running on worker node,
-                 * but keep task in memory if it's running on master node.
-                 * cancel-scheduling will read task from backend store, if
+                 * Remove the task from memory if it's running on worker node,
+                 * but keep the task in memory if it's running on master node.
+                 * Cancel-scheduling will read the task from backend store, if
                  * removed this instance from memory, there will be two task
-                 * instances with same id, and can't cancel the real task that
+                 * instances with the same id, and can't cancel the real task 
that
                  * is running but removed from memory.
                  */
                 this.remove(task);
@@ -301,12 +301,10 @@ public class StandardTaskScheduler implements 
TaskScheduler {
 
     protected synchronized void scheduleTasksOnMaster() {
         // Master server schedule all scheduling tasks to suitable worker nodes
-        Collection<HugeServerInfo> serverInfos = this.serverManager()
-                                                     .allServerInfos();
+        Collection<HugeServerInfo> serverInfos = 
this.serverManager().allServerInfos();
         String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
         do {
-            Iterator<HugeTask<Object>> tasks = 
this.tasks(TaskStatus.SCHEDULING,
-                                                          PAGE_SIZE, page);
+            Iterator<HugeTask<Object>> tasks = 
this.tasks(TaskStatus.SCHEDULING, PAGE_SIZE, page);
             while (tasks.hasNext()) {
                 HugeTask<?> task = tasks.next();
                 if (task.server() != null) {
@@ -318,12 +316,10 @@ public class StandardTaskScheduler implements 
TaskScheduler {
                     return;
                 }
 
-                HugeServerInfo server = this.serverManager().pickWorkerNode(
-                        serverInfos, task);
+                HugeServerInfo server = 
this.serverManager().pickWorkerNode(serverInfos, task);
                 if (server == null) {
                     LOG.info("The master can't find suitable servers to " +
-                             "execute task '{}', wait for next schedule",
-                             task.id());
+                             "execute task '{}', wait for next schedule", 
task.id());
                     continue;
                 }
 
@@ -336,8 +332,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
                 // Update server load in memory, it will be saved at the ending
                 server.increaseLoad(task.load());
 
-                LOG.info("Scheduled task '{}' to server '{}'",
-                         task.id(), server.id());
+                LOG.info("Scheduled task '{}' to server '{}'", task.id(), 
server.id());
             }
             if (page != null) {
                 page = PageInfo.pageInfo(tasks);
@@ -351,8 +346,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
     protected void executeTasksOnWorker(Id server) {
         String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
         do {
-            Iterator<HugeTask<Object>> tasks = this.tasks(TaskStatus.SCHEDULED,
-                                                          PAGE_SIZE, page);
+            Iterator<HugeTask<Object>> tasks = 
this.tasks(TaskStatus.SCHEDULED, PAGE_SIZE, page);
             while (tasks.hasNext()) {
                 HugeTask<?> task = tasks.next();
                 this.initTaskCallable(task);
@@ -381,8 +375,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
     protected void cancelTasksOnWorker(Id server) {
         String page = this.supportsPaging() ? PageInfo.PAGE_NONE : null;
         do {
-            Iterator<HugeTask<Object>> tasks = 
this.tasks(TaskStatus.CANCELLING,
-                                                          PAGE_SIZE, page);
+            Iterator<HugeTask<Object>> tasks = 
this.tasks(TaskStatus.CANCELLING, PAGE_SIZE, page);
             while (tasks.hasNext()) {
                 HugeTask<?> task = tasks.next();
                 Id taskServer = task.server();
@@ -557,10 +550,10 @@ public class StandardTaskScheduler implements 
TaskScheduler {
 
         HugeTask<?> task = this.task(id);
         /*
-         * The following is out of date when task running on worker node:
+         * The following is out of date when the task running on worker node:
          * HugeTask<?> task = this.tasks.get(id);
          * Tasks are removed from memory after completed at most time,
-         * but there is a tiny gap between tasks are completed and
+         * but there is a tiny gap between tasks is completed and
          * removed from memory.
          * We assume tasks only in memory may be incomplete status,
          * in fact, it is also possible to appear on the backend tasks
@@ -621,7 +614,7 @@ public class StandardTaskScheduler implements TaskScheduler 
{
                 throw e;
             }
             if (task.completed()) {
-                // Wait for task result being set after status is completed
+                // Wait for the task result being set after the status is 
completed
                 sleep(intervalMs);
                 return task;
             }

Reply via email to