This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 2874c3e9 [MINOR][IMPROVEMENT] Output the registering/lost/exclude 
nodes in log (#148)
2874c3e9 is described below

commit 2874c3e9057b2933211354c666cb201aeacc3581
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Aug 10 17:22:03 2022 +0800

    [MINOR][IMPROVEMENT] Output the registering/lost/exclude nodes in log (#148)
    
    ### What changes were proposed in this pull request?
    
    [Log Improvement] Output the registering/lost/exclude nodes in log
    
    ### Why are the changes needed?
    It's hard to find the alive nodes in coordinator side, and only total alive 
nodes number can be shown in coordinator server metrics, this is not enough. So 
it's necessary to output the registering/lost/exclude nodes in log file.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    No need.
---
 .../uniffle/coordinator/CoordinatorConf.java       |  7 +++++++
 .../uniffle/coordinator/SimpleClusterManager.java  | 23 +++++++++++++++++++++-
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
index ebb50cef..18bd6614 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java
@@ -47,6 +47,13 @@ public class CoordinatorConf extends RssBaseConf {
       .longType()
       .defaultValue(30 * 1000L)
       .withDescription("timeout if can't get heartbeat from shuffle server");
+  public static final ConfigOption<Long> 
COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES = ConfigOptions
+      .key("rss.coordinator.server.periodic.output.interval.times")
+      .longType()
+      .checkValue(ConfigUtils.POSITIVE_LONG_VALIDATOR, "output server list 
interval times must be positive")
+      .defaultValue(30L)
+      .withDescription("The periodic interval times of output alive nodes. The 
interval sec can be calculated by ("
+          + COORDINATOR_HEARTBEAT_TIMEOUT.key() + "/3 * 
rss.coordinator.server.periodic.output.interval.times)");
   public static final ConfigOption<String> COORDINATOR_ASSIGNMENT_STRATEGY = 
ConfigOptions
       .key("rss.coordinator.assignment.strategy")
       .stringType()
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
index 8f81b396..1535c626 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java
@@ -30,6 +30,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
@@ -60,12 +61,17 @@ public class SimpleClusterManager implements ClusterManager 
{
   private ScheduledExecutorService checkNodesExecutorService;
   private FileSystem hadoopFileSystem;
 
+  private long outputAliveServerCount = 0;
+  private final long periodicOutputIntervalTimes;
+
   public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) 
throws IOException {
     this.shuffleNodesMax = 
conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
     this.heartbeatTimeout = 
conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
     // the thread for checking if shuffle server report heartbeat in time
     scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
         ThreadUtils.getThreadFactory("SimpleClusterManager-%d"));
+
+    periodicOutputIntervalTimes = 
conf.get(CoordinatorConf.COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES);
     scheduledExecutorService.scheduleAtFixedRate(
         () -> nodesCheck(), heartbeatTimeout / 3,
         heartbeatTimeout / 3, TimeUnit.MILLISECONDS);
@@ -103,6 +109,13 @@ public class SimpleClusterManager implements 
ClusterManager {
           }
         }
       }
+      if (!deleteIds.isEmpty() || outputAliveServerCount % 
periodicOutputIntervalTimes == 0) {
+        LOG.info("Alive servers number: {}, ids: {}",
+            servers.size(),
+            servers.keySet().stream().collect(Collectors.toList())
+        );
+      }
+      outputAliveServerCount++;
 
       CoordinatorMetrics.gaugeUnhealthyServerNum.set(unhealthyNode.size());
       CoordinatorMetrics.gaugeTotalServerNum.set(servers.size());
@@ -112,6 +125,7 @@ public class SimpleClusterManager implements ClusterManager 
{
   }
 
   private void updateExcludeNodes(String path) {
+    int originalExcludeNodesNumber = excludeNodes.size();
     try {
       Path hadoopPath = new Path(path);
       FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath);
@@ -123,12 +137,16 @@ public class SimpleClusterManager implements 
ClusterManager {
       } else {
         excludeNodes = Sets.newConcurrentHashSet();
       }
-      CoordinatorMetrics.gaugeExcludeServerNum.set(excludeNodes.size());
     } catch (FileNotFoundException fileNotFoundException) {
       excludeNodes = Sets.newConcurrentHashSet();
     } catch (Exception e) {
       LOG.warn("Error when updating exclude nodes, the exclude nodes file 
path: " + path, e);
     }
+    int newlyExcludeNodesNumber = excludeNodes.size();
+    if (newlyExcludeNodesNumber != originalExcludeNodesNumber) {
+      LOG.info("Exclude nodes number: {}, nodes list: {}", 
newlyExcludeNodesNumber, excludeNodes);
+    }
+    CoordinatorMetrics.gaugeExcludeServerNum.set(excludeNodes.size());
   }
 
   private void parseExcludeNodesFile(DataInputStream fsDataInputStream) throws 
IOException {
@@ -148,6 +166,9 @@ public class SimpleClusterManager implements ClusterManager 
{
 
   @Override
   public void add(ServerNode node) {
+    if (!servers.containsKey(node.getId())) {
+      LOG.info("Newly registering node: {}", node.getId());
+    }
     servers.put(node.getId(), node);
     Set<String> tags = node.getTags();
     // remove node with all tags to deal with the situation of tag change

Reply via email to