This is an automated email from the ASF dual-hosted git repository.

jshao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git

commit 5ec04b89348ca9c28c9ddce571ffa528969d2f8a
Author: Junfan Zhang <junfan.zh...@outlook.com>
AuthorDate: Thu Jun 30 19:12:36 2022 +0800

    Support using remote fs path to specify the excludeNodesFilePath (#200)
    
    What changes were proposed in this pull request?
    Support using remote fs path to specify the excludeNodesFilePath
    
    Why are the changes needed?
    When existing two coordinators serving for online, we hope they can read 
the consistent exclude nodes file insteading of using the localfile syncing 
manually.
    
    Does this PR introduce any user-facing change?
    Yes. It's an incompatible change.
    
    When the default fs is HDFS in the core-site.xml, and the excludeFilePath 
is specified to "/user/xxxxx" in coordinator server.
    After applied this patch, filesystem will be initialized to remote HDFS due 
to lacking scheme.
    
    How was this patch tested?
    Unit tests.
---
 .../rss/coordinator/ClusterManagerFactory.java     | 10 +++-
 .../tencent/rss/coordinator/CoordinatorServer.java |  2 +-
 .../rss/coordinator/SimpleClusterManager.java      | 68 +++++++++++++---------
 .../coordinator/BasicAssignmentStrategyTest.java   |  6 +-
 .../PartitionBalanceAssignmentStrategyTest.java    |  6 +-
 .../rss/coordinator/SimpleClusterManagerTest.java  | 13 +++--
 6 files changed, 63 insertions(+), 42 deletions(-)

diff --git 
a/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManagerFactory.java
 
b/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManagerFactory.java
index 2ec2b12..b2723f9 100644
--- 
a/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManagerFactory.java
+++ 
b/coordinator/src/main/java/com/tencent/rss/coordinator/ClusterManagerFactory.java
@@ -18,15 +18,19 @@
 
 package com.tencent.rss.coordinator;
 
+import org.apache.hadoop.conf.Configuration;
+
 public class ClusterManagerFactory {
 
   CoordinatorConf conf;
+  Configuration hadoopConf;
 
-  public ClusterManagerFactory(CoordinatorConf conf) {
+  public ClusterManagerFactory(CoordinatorConf conf, Configuration hadoopConf) 
{
     this.conf = conf;
+    this.hadoopConf = hadoopConf;
   }
 
-  public ClusterManager getClusterManager() {
-    return new SimpleClusterManager(conf);
+  public ClusterManager getClusterManager() throws Exception {
+    return new SimpleClusterManager(conf, hadoopConf);
   }
 }
diff --git 
a/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java 
b/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
index 3b79221..2dbe06f 100644
--- 
a/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
+++ 
b/coordinator/src/main/java/com/tencent/rss/coordinator/CoordinatorServer.java
@@ -111,7 +111,7 @@ public class CoordinatorServer {
     registerMetrics();
     this.applicationManager = new ApplicationManager(coordinatorConf);
 
-    ClusterManagerFactory clusterManagerFactory = new 
ClusterManagerFactory(coordinatorConf);
+    ClusterManagerFactory clusterManagerFactory = new 
ClusterManagerFactory(coordinatorConf, new Configuration());
     this.clusterManager = clusterManagerFactory.getClusterManager();
     this.clientConfManager = new ClientConfManager(coordinatorConf, new 
Configuration(), applicationManager);
     AssignmentStrategyFactory assignmentStrategyFactory =
diff --git 
a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
 
b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
index fcfd1dc..972ea5f 100644
--- 
a/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
+++ 
b/coordinator/src/main/java/com/tencent/rss/coordinator/SimpleClusterManager.java
@@ -19,9 +19,10 @@
 package com.tencent.rss.coordinator;
 
 import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
+import java.io.DataInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +37,10 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +57,9 @@ public class SimpleClusterManager implements ClusterManager {
   private int shuffleNodesMax;
   private ScheduledExecutorService scheduledExecutorService;
   private ScheduledExecutorService checkNodesExecutorService;
+  private FileSystem hadoopFileSystem;
 
-  public SimpleClusterManager(CoordinatorConf conf) {
+  public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) 
throws IOException {
     this.shuffleNodesMax = 
conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX);
     this.heartbeatTimeout = 
conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT);
     // the thread for checking if shuffle server report heartbeat in time
@@ -65,6 +71,7 @@ public class SimpleClusterManager implements ClusterManager {
 
     String excludeNodesPath = 
conf.getString(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, "");
     if (!StringUtils.isEmpty(excludeNodesPath)) {
+      this.hadoopFileSystem = CoordinatorUtils.getFileSystemForPath(new 
Path(excludeNodesPath), hadoopConf);
       long updateNodesInterval = 
conf.getLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL);
       checkNodesExecutorService = Executors.newSingleThreadScheduledExecutor(
           new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("UpdateExcludeNodes-%d").build());
@@ -100,39 +107,37 @@ public class SimpleClusterManager implements 
ClusterManager {
 
   private void updateExcludeNodes(String path) {
     try {
-      File excludeNodesFile = new File(path);
-      if (excludeNodesFile.exists()) {
-        // don't parse same file twice
-        if (excludeLastModify.get() != excludeNodesFile.lastModified()) {
-          parseExcludeNodesFile(excludeNodesFile);
+      Path hadoopPath = new Path(path);
+      FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath);
+      if (fileStatus != null && fileStatus.isFile()) {
+        if (excludeLastModify.get() != fileStatus.getModificationTime()) {
+          parseExcludeNodesFile(hadoopFileSystem.open(hadoopPath));
+          excludeLastModify.set(fileStatus.getModificationTime());
         }
       } else {
         excludeNodes = Sets.newConcurrentHashSet();
       }
       CoordinatorMetrics.gaugeExcludeServerNum.set(excludeNodes.size());
+    } catch (FileNotFoundException fileNotFoundException) {
+      excludeNodes = Sets.newConcurrentHashSet();
     } catch (Exception e) {
-      LOG.warn("Error when update exclude nodes", e);
+      LOG.warn("Error when updating exclude nodes, the exclude nodes file 
path: " + path, e);
     }
   }
 
-  private void parseExcludeNodesFile(File excludeNodesFile) {
-    try {
-      Set<String> nodes = Sets.newConcurrentHashSet();
-      try (BufferedReader br = new BufferedReader(new 
FileReader(excludeNodesFile))) {
-        String line;
-        while ((line = br.readLine()) != null) {
-          if (!StringUtils.isEmpty(line)) {
-            nodes.add(line);
-          }
+  private void parseExcludeNodesFile(DataInputStream fsDataInputStream) throws 
IOException {
+    Set<String> nodes = Sets.newConcurrentHashSet();
+    try (BufferedReader br = new BufferedReader(new 
InputStreamReader(fsDataInputStream))) {
+      String line;
+      while ((line = br.readLine()) != null) {
+        if (!StringUtils.isEmpty(line)) {
+          nodes.add(line);
         }
       }
-      // update exclude nodes and last modify time
-      excludeNodes = nodes;
-      excludeLastModify.set(excludeNodesFile.lastModified());
-      LOG.info("Update exclude nodes and " + excludeNodes.size() + " nodes was 
marked as exclude nodes");
-    } catch (Exception e) {
-      LOG.warn("Error when parse file " + excludeNodesFile.getAbsolutePath(), 
e);
     }
+    // update exclude nodes and last modify time
+    excludeNodes = nodes;
+    LOG.info("Updated exclude nodes and " + excludeNodes.size() + " nodes were 
marked as exclude nodes");
   }
 
   @Override
@@ -186,18 +191,23 @@ public class SimpleClusterManager implements 
ClusterManager {
     servers.clear();
   }
 
+  @Override
+  public int getShuffleNodesMax() {
+    return shuffleNodesMax;
+  }
+
   @Override
   public void close() throws IOException {
+    if (hadoopFileSystem != null) {
+      hadoopFileSystem.close();
+    }
+
     if (scheduledExecutorService != null) {
       scheduledExecutorService.shutdown();
     }
+
     if (checkNodesExecutorService != null) {
       checkNodesExecutorService.shutdown();
     }
   }
-
-  @Override
-  public int getShuffleNodesMax() {
-    return shuffleNodesMax;
-  }
 }
diff --git 
a/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
 
b/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
index 7a95d76..c316c62 100644
--- 
a/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
+++ 
b/coordinator/src/test/java/com/tencent/rss/coordinator/BasicAssignmentStrategyTest.java
@@ -31,6 +31,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.SortedMap;
+
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,10 +45,10 @@ public class BasicAssignmentStrategyTest {
   private int shuffleNodesMax = 7;
 
   @BeforeEach
-  public void setUp() {
+  public void setUp() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
-    clusterManager = new SimpleClusterManager(ssc);
+    clusterManager = new SimpleClusterManager(ssc, new Configuration());
     strategy = new BasicAssignmentStrategy(clusterManager);
   }
 
diff --git 
a/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
 
b/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
index 9ca4146..196f6dd 100644
--- 
a/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
+++ 
b/coordinator/src/test/java/com/tencent/rss/coordinator/PartitionBalanceAssignmentStrategyTest.java
@@ -27,6 +27,8 @@ import java.util.concurrent.TimeUnit;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
+
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -43,10 +45,10 @@ public class PartitionBalanceAssignmentStrategyTest {
   private Set<String> tags = Sets.newHashSet("test");
 
   @BeforeEach
-  public void setUp() {
+  public void setUp() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX, 
shuffleNodesMax);
-    clusterManager = new SimpleClusterManager(ssc);
+    clusterManager = new SimpleClusterManager(ssc, new Configuration());
     strategy = new PartitionBalanceAssignmentStrategy(clusterManager);
   }
 
diff --git 
a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
 
b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
index fc90d9e..cfa875d 100644
--- 
a/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
+++ 
b/coordinator/src/test/java/com/tencent/rss/coordinator/SimpleClusterManagerTest.java
@@ -22,11 +22,14 @@ import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.net.URI;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.Sets;
+
+import org.apache.hadoop.conf.Configuration;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
@@ -53,7 +56,7 @@ public class SimpleClusterManagerTest {
   public void getServerListTest() throws IOException {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new 
Configuration());
     ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
         10, testTags, true);
     ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
@@ -108,7 +111,7 @@ public class SimpleClusterManagerTest {
   public void heartbeatTimeoutTest() throws Exception {
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 300L);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new 
Configuration());
     Thread t = new Thread(() -> {
       for (int i = 0; i < 3; i++) {
         if (i == 2) {
@@ -152,7 +155,7 @@ public class SimpleClusterManagerTest {
   public void testGetCorrectServerNodesWhenOneNodeRemoved() throws IOException 
{
     CoordinatorConf ssc = new CoordinatorConf();
     ssc.setLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT, 30 * 1000L);
-    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc);
+    SimpleClusterManager clusterManager = new SimpleClusterManager(ssc, new 
Configuration());
     ServerNode sn1 = new ServerNode("sn1", "ip", 0, 100L, 50L, 20,
             10, testTags, true);
     ServerNode sn2 = new ServerNode("sn2", "ip", 0, 100L, 50L, 21,
@@ -181,13 +184,13 @@ public class SimpleClusterManagerTest {
     String excludeNodesFolder = (new 
File(ClassLoader.getSystemResource("empty").getFile())).getParent();
     String excludeNodesPath = excludeNodesFolder + "/excludeNodes";
     CoordinatorConf ssc = new CoordinatorConf();
-    ssc.setString(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, 
excludeNodesPath);
+    ssc.setString(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_FILE_PATH, 
URI.create(excludeNodesPath).toString());
     ssc.setLong(CoordinatorConf.COORDINATOR_EXCLUDE_NODES_CHECK_INTERVAL, 
2000);
 
     Set<String> nodes = Sets.newHashSet("node1-1999", "node2-1999");
     writeExcludeHosts(excludeNodesPath, nodes);
 
-    SimpleClusterManager scm = new SimpleClusterManager(ssc);
+    SimpleClusterManager scm = new SimpleClusterManager(ssc, new 
Configuration());
     scm.add(new ServerNode("node1-1999", "ip", 0, 100L, 50L, 20,
         10, testTags, true));
     scm.add(new ServerNode("node2-1999", "ip", 0, 100L, 50L, 20,

Reply via email to