This is an automated email from the ASF dual-hosted git repository.

xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new d140e4ec [#678] improvement: Write hdfs files asynchronously when 
`detectStorage` (#680)
d140e4ec is described below

commit d140e4ec532e9c4ff4a4b7df3a5e1789217f8737
Author: jokercurry <[email protected]>
AuthorDate: Mon Mar 6 16:51:56 2023 +0800

    [#678] improvement: Write hdfs files asynchronously when `detectStorage` 
(#680)
    
    ### What changes were proposed in this pull request?
    Write files to hdfs asynchronously and reduce the time of detectStorage
    
    ### Why are the changes needed?
    Fix: #678
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Origin uts.
---
 .../storage/AbstractSelectStorageStrategy.java     | 98 +++++++++++++++++++++-
 .../storage/AppBalanceSelectStorageStrategy.java   | 56 -------------
 .../LowestIOSampleCostSelectStorageStrategy.java   | 87 ++++---------------
 3 files changed, 114 insertions(+), 127 deletions(-)

diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
index b4f7c699..0524cc76 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
@@ -18,16 +18,30 @@
 package org.apache.uniffle.coordinator.strategy.storage;
 
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 import org.apache.uniffle.coordinator.util.CoordinatorUtils;
 
@@ -35,19 +49,26 @@ import org.apache.uniffle.coordinator.util.CoordinatorUtils;
  * This is a simple implementation class, which provides some methods to check 
whether the path is normal
  */
 public abstract class AbstractSelectStorageStrategy implements 
SelectStorageStrategy {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractSelectStorageStrategy.class);
   /**
    * store remote path -> application count for assignment strategy
    */
   protected final Map<String, RankValue> remoteStoragePathRankValue;
   protected final int fileSize;
   private final String coordinatorId;
+  private final Configuration hdfsConf;
+  private final CoordinatorConf conf;
+  protected List<Map.Entry<String, RankValue>> uris;
 
   public AbstractSelectStorageStrategy(
       Map<String, RankValue> remoteStoragePathRankValue,
       CoordinatorConf conf) {
     this.remoteStoragePathRankValue = remoteStoragePathRankValue;
-    fileSize = 
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+    this.hdfsConf = new Configuration();
+    this.fileSize = 
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
     this.coordinatorId = conf.getString(CoordinatorUtils.COORDINATOR_ID, 
UUID.randomUUID().toString());
+    this.conf = conf;
   }
 
   public void readAndWriteHdfsStorage(FileSystem fs, Path testPath,
@@ -76,6 +97,81 @@ public abstract class AbstractSelectStorageStrategy 
implements SelectStorageStra
     }
   }
 
+  @Override
+  public void detectStorage() {
+    uris = 
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+    if (remoteStoragePathRankValue.size() > 1) {
+      CountDownLatch countDownLatch = new CountDownLatch(uris.size());
+      uris.parallelStream().forEach(uri -> {
+        if 
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
+          Path remotePath = new Path(uri.getKey());
+          String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId()
+              + Thread.currentThread().getName();
+          Path testPath = new Path(rssTest);
+          RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
+          rankValue.setHealthy(new AtomicBoolean(true));
+          long startWriteTime = System.currentTimeMillis();
+          try {
+            FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath, 
hdfsConf);
+            for (int j = 0; j < readAndWriteTimes(conf); j++) {
+              readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue);
+            }
+          } catch (Exception e) {
+            LOG.error("Storage read and write error, we will not use this 
remote path {}.", uri, e);
+            rankValue.setHealthy(new AtomicBoolean(false));
+          } finally {
+            sortPathByRankValue(uri.getKey(), rssTest, startWriteTime);
+          }
+          countDownLatch.countDown();
+        }
+      });
+      try {
+        countDownLatch.await();
+      } catch (InterruptedException e) {
+        LOG.error("Failed to detectStorage!");
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public void sortPathByRankValue(
+      String path, String testPath, long startWrite) {
+    RankValue rankValue = remoteStoragePathRankValue.get(path);
+    try {
+      FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path), 
hdfsConf);
+      fs.delete(new Path(testPath), true);
+      if (rankValue.getHealthy().get()) {
+        rankValue.setCostTime(new AtomicLong(System.currentTimeMillis() - 
startWrite));
+      }
+    } catch (Exception e) {
+      rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE));
+      LOG.error("Failed to sort, we will not use this remote path {}.", path, 
e);
+    }
+
+    if (this.getComparator() != null) {
+      uris = Lists.newCopyOnWriteArrayList(
+          remoteStoragePathRankValue.entrySet()).stream().filter(
+          
Objects::nonNull).sorted(this.getComparator()).collect(Collectors.toList());
+    } else {
+      uris = Lists.newCopyOnWriteArrayList(
+          remoteStoragePathRankValue.entrySet()).stream().filter(
+          Objects::nonNull).collect(Collectors.toList());
+    }
+    LOG.info("The sorted remote path list is: {}", uris);
+  }
+
+  protected int readAndWriteTimes(CoordinatorConf conf) {
+    return 1;
+  }
+
+  /**
+   * Different strategies will have different sorting methods of remote paths
+   * @return A comparator is to calculate the RankValue
+   */
+  protected Comparator<Map.Entry<String, RankValue>> getComparator() {
+    return null;
+  }
+
   String getCoordinatorId() {
     return coordinatorId;
   }
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
index d2932256..1efc4546 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
@@ -18,24 +18,13 @@
 package org.apache.uniffle.coordinator.strategy.storage;
 
 import java.util.Comparator;
-import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
-import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 
 /**
@@ -49,8 +38,6 @@ public class AppBalanceSelectStorageStrategy extends 
AbstractSelectStorageStrate
    */
   private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
   private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
-  private final Configuration hdfsConf;
-  private List<Map.Entry<String, RankValue>> uris;
 
   public AppBalanceSelectStorageStrategy(
       Map<String, RankValue> remoteStoragePathRankValue,
@@ -60,49 +47,6 @@ public class AppBalanceSelectStorageStrategy extends 
AbstractSelectStorageStrate
     super(remoteStoragePathRankValue, conf);
     this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo;
     this.availableRemoteStorageInfo = availableRemoteStorageInfo;
-    this.hdfsConf = new Configuration();
-  }
-
-  @VisibleForTesting
-  public void sortPathByRankValue(String path, String test) {
-    RankValue rankValue = remoteStoragePathRankValue.get(path);
-    try {
-      FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path), 
hdfsConf);
-      fs.delete(new Path(test),true);
-      if (rankValue.getHealthy().get()) {
-        rankValue.setCostTime(new AtomicLong(0));
-      }
-    } catch (Exception e) {
-      rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE));
-      LOG.error("Failed to sort, we will not use this remote path {}.", path, 
e);
-    }
-    uris = 
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream()
-        .filter(Objects::nonNull).collect(Collectors.toList());
-  }
-
-  @Override
-  public void detectStorage() {
-    uris = 
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
-    if (remoteStoragePathRankValue.size() > 1) {
-      for (Map.Entry<String, RankValue> uri : uris) {
-        if 
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
-          RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
-          rankValue.setHealthy(new AtomicBoolean(true));
-          Path remotePath = new Path(uri.getKey());
-          String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
-          Path testPath = new Path(rssTest);
-          try {
-            FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath, 
hdfsConf);
-            readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue);
-          } catch (Exception e) {
-            rankValue.setHealthy(new AtomicBoolean(false));
-            LOG.error("Storage read and write error, we will not use this 
remote path {}.", uri, e);
-          } finally {
-            sortPathByRankValue(uri.getKey(), rssTest);
-          }
-        }
-      }
-    }
   }
 
   /**
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
index 7414e4b2..29caaa18 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
@@ -17,24 +17,13 @@
 
 package org.apache.uniffle.coordinator.strategy.storage;
 
-import java.util.List;
+import java.util.Comparator;
 import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
-import org.apache.uniffle.coordinator.ApplicationManager;
 import org.apache.uniffle.coordinator.CoordinatorConf;
 
 /**
@@ -50,9 +39,6 @@ public class LowestIOSampleCostSelectStorageStrategy extends 
AbstractSelectStora
    */
   private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
   private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
-  private final Configuration hdfsConf;
-  private final int readAndWriteTimes;
-  private List<Map.Entry<String, RankValue>> uris;
 
   public LowestIOSampleCostSelectStorageStrategy(
       Map<String, RankValue> remoteStoragePathRankValue,
@@ -62,65 +48,21 @@ public class LowestIOSampleCostSelectStorageStrategy 
extends AbstractSelectStora
     super(remoteStoragePathRankValue, conf);
     this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo;
     this.availableRemoteStorageInfo = availableRemoteStorageInfo;
-    this.hdfsConf = new Configuration();
-    readAndWriteTimes = 
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES);
-  }
-
-  @VisibleForTesting
-  public void sortPathByRankValue(
-      String path, String testPath, long startWrite) {
-    RankValue rankValue = remoteStoragePathRankValue.get(path);
-    try {
-      FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path), 
hdfsConf);
-      fs.delete(new Path(testPath), true);
-      if (rankValue.getHealthy().get()) {
-        rankValue.setCostTime(new AtomicLong(System.currentTimeMillis() - 
startWrite));
-      }
-    } catch (Exception e) {
-      rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE));
-      LOG.error("Failed to sort, we will not use this remote path {}.", path, 
e);
-    }
-    uris = Lists.newCopyOnWriteArrayList(
-        
remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull).sorted((x,
 y) -> {
-          final long xReadAndWriteTime = x.getValue().getCostTime().get();
-          final long yReadAndWriteTime = y.getValue().getCostTime().get();
-          if (xReadAndWriteTime > yReadAndWriteTime) {
-            return 1;
-          } else if (xReadAndWriteTime < yReadAndWriteTime) {
-            return -1;
-          } else {
-            return Integer.compare(x.getValue().getAppNum().get(), 
y.getValue().getAppNum().get());
-          }
-        }).collect(Collectors.toList());
-    LOG.info("The sorted remote path list is: {}", uris);
   }
 
   @Override
-  public void detectStorage() {
-    uris = 
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
-    if (remoteStoragePathRankValue.size() > 1) {
-      for (Map.Entry<String, RankValue> uri : uris) {
-        if 
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
-          Path remotePath = new Path(uri.getKey());
-          String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
-          Path testPath = new Path(rssTest);
-          RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
-          rankValue.setHealthy(new AtomicBoolean(true));
-          long startWriteTime = System.currentTimeMillis();
-          try {
-            FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath, 
hdfsConf);
-            for (int j = 0; j < readAndWriteTimes; j++) {
-              readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue);
-            }
-          } catch (Exception e) {
-            LOG.error("Storage read and write error, we will not use this 
remote path {}.", uri, e);
-            rankValue.setHealthy(new AtomicBoolean(false));
-          } finally {
-            sortPathByRankValue(uri.getKey(), rssTest, startWriteTime);
-          }
-        }
+  public Comparator<Map.Entry<String, RankValue>> getComparator() {
+    return (x, y) -> {
+      final long xReadAndWriteTime = x.getValue().getCostTime().get();
+      final long yReadAndWriteTime = y.getValue().getCostTime().get();
+      if (xReadAndWriteTime > yReadAndWriteTime) {
+        return 1;
+      } else if (xReadAndWriteTime < yReadAndWriteTime) {
+        return -1;
+      } else {
+        return Integer.compare(x.getValue().getAppNum().get(), 
y.getValue().getAppNum().get());
       }
-    }
+    };
   }
 
   @Override
@@ -135,4 +77,9 @@ public class LowestIOSampleCostSelectStorageStrategy extends 
AbstractSelectStora
     LOG.warn("No remote storage is available, we will default to the first.");
     return availableRemoteStorageInfo.values().iterator().next();
   }
+
+  @Override
+  public int readAndWriteTimes(CoordinatorConf conf) {
+    return 
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES);
+  }
 }

Reply via email to