This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new d140e4ec [#678] improvement: Write hdfs files asynchronously when
`detectStorage` (#680)
d140e4ec is described below
commit d140e4ec532e9c4ff4a4b7df3a5e1789217f8737
Author: jokercurry <[email protected]>
AuthorDate: Mon Mar 6 16:51:56 2023 +0800
[#678] improvement: Write hdfs files asynchronously when `detectStorage`
(#680)
### What changes were proposed in this pull request?
Write files to hdfs asynchronously and reduce the time of detectStorage
### Why are the changes needed?
Fix: #678
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Origin uts.
---
.../storage/AbstractSelectStorageStrategy.java | 98 +++++++++++++++++++++-
.../storage/AppBalanceSelectStorageStrategy.java | 56 -------------
.../LowestIOSampleCostSelectStorageStrategy.java | 87 ++++---------------
3 files changed, 114 insertions(+), 127 deletions(-)
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
index b4f7c699..0524cc76 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AbstractSelectStorageStrategy.java
@@ -18,16 +18,30 @@
package org.apache.uniffle.coordinator.strategy.storage;
import java.io.IOException;
+import java.util.Comparator;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.util.CoordinatorUtils;
@@ -35,19 +49,26 @@ import org.apache.uniffle.coordinator.util.CoordinatorUtils;
* This is a simple implementation class, which provides some methods to check
whether the path is normal
*/
public abstract class AbstractSelectStorageStrategy implements
SelectStorageStrategy {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractSelectStorageStrategy.class);
/**
* store remote path -> application count for assignment strategy
*/
protected final Map<String, RankValue> remoteStoragePathRankValue;
protected final int fileSize;
private final String coordinatorId;
+ private final Configuration hdfsConf;
+ private final CoordinatorConf conf;
+ protected List<Map.Entry<String, RankValue>> uris;
public AbstractSelectStorageStrategy(
Map<String, RankValue> remoteStoragePathRankValue,
CoordinatorConf conf) {
this.remoteStoragePathRankValue = remoteStoragePathRankValue;
- fileSize =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+ this.hdfsConf = new Configuration();
+ this.fileSize =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
this.coordinatorId = conf.getString(CoordinatorUtils.COORDINATOR_ID,
UUID.randomUUID().toString());
+ this.conf = conf;
}
public void readAndWriteHdfsStorage(FileSystem fs, Path testPath,
@@ -76,6 +97,81 @@ public abstract class AbstractSelectStorageStrategy
implements SelectStorageStra
}
}
+ @Override
+ public void detectStorage() {
+ uris =
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
+ if (remoteStoragePathRankValue.size() > 1) {
+ CountDownLatch countDownLatch = new CountDownLatch(uris.size());
+ uris.parallelStream().forEach(uri -> {
+ if
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
+ Path remotePath = new Path(uri.getKey());
+ String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId()
+ + Thread.currentThread().getName();
+ Path testPath = new Path(rssTest);
+ RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
+ rankValue.setHealthy(new AtomicBoolean(true));
+ long startWriteTime = System.currentTimeMillis();
+ try {
+ FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath,
hdfsConf);
+ for (int j = 0; j < readAndWriteTimes(conf); j++) {
+ readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue);
+ }
+ } catch (Exception e) {
+ LOG.error("Storage read and write error, we will not use this
remote path {}.", uri, e);
+ rankValue.setHealthy(new AtomicBoolean(false));
+ } finally {
+ sortPathByRankValue(uri.getKey(), rssTest, startWriteTime);
+ }
+ countDownLatch.countDown();
+ }
+ });
+ try {
+ countDownLatch.await();
+ } catch (InterruptedException e) {
+ LOG.error("Failed to detectStorage!");
+ }
+ }
+ }
+
+ @VisibleForTesting
+ public void sortPathByRankValue(
+ String path, String testPath, long startWrite) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ try {
+ FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path),
hdfsConf);
+ fs.delete(new Path(testPath), true);
+ if (rankValue.getHealthy().get()) {
+ rankValue.setCostTime(new AtomicLong(System.currentTimeMillis() -
startWrite));
+ }
+ } catch (Exception e) {
+ rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE));
+ LOG.error("Failed to sort, we will not use this remote path {}.", path,
e);
+ }
+
+ if (this.getComparator() != null) {
+ uris = Lists.newCopyOnWriteArrayList(
+ remoteStoragePathRankValue.entrySet()).stream().filter(
+
Objects::nonNull).sorted(this.getComparator()).collect(Collectors.toList());
+ } else {
+ uris = Lists.newCopyOnWriteArrayList(
+ remoteStoragePathRankValue.entrySet()).stream().filter(
+ Objects::nonNull).collect(Collectors.toList());
+ }
+ LOG.info("The sorted remote path list is: {}", uris);
+ }
+
+ protected int readAndWriteTimes(CoordinatorConf conf) {
+ return 1;
+ }
+
+ /**
+ * Different strategies will have different sorting methods of remote paths
+ * @return A comparator is to calculate the RankValue
+ */
+ protected Comparator<Map.Entry<String, RankValue>> getComparator() {
+ return null;
+ }
+
String getCoordinatorId() {
return coordinatorId;
}
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
index d2932256..1efc4546 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/AppBalanceSelectStorageStrategy.java
@@ -18,24 +18,13 @@
package org.apache.uniffle.coordinator.strategy.storage;
import java.util.Comparator;
-import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
-import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
/**
@@ -49,8 +38,6 @@ public class AppBalanceSelectStorageStrategy extends
AbstractSelectStorageStrate
*/
private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
- private final Configuration hdfsConf;
- private List<Map.Entry<String, RankValue>> uris;
public AppBalanceSelectStorageStrategy(
Map<String, RankValue> remoteStoragePathRankValue,
@@ -60,49 +47,6 @@ public class AppBalanceSelectStorageStrategy extends
AbstractSelectStorageStrate
super(remoteStoragePathRankValue, conf);
this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo;
this.availableRemoteStorageInfo = availableRemoteStorageInfo;
- this.hdfsConf = new Configuration();
- }
-
- @VisibleForTesting
- public void sortPathByRankValue(String path, String test) {
- RankValue rankValue = remoteStoragePathRankValue.get(path);
- try {
- FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path),
hdfsConf);
- fs.delete(new Path(test),true);
- if (rankValue.getHealthy().get()) {
- rankValue.setCostTime(new AtomicLong(0));
- }
- } catch (Exception e) {
- rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE));
- LOG.error("Failed to sort, we will not use this remote path {}.", path,
e);
- }
- uris =
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream()
- .filter(Objects::nonNull).collect(Collectors.toList());
- }
-
- @Override
- public void detectStorage() {
- uris =
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
- if (remoteStoragePathRankValue.size() > 1) {
- for (Map.Entry<String, RankValue> uri : uris) {
- if
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
- RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
- rankValue.setHealthy(new AtomicBoolean(true));
- Path remotePath = new Path(uri.getKey());
- String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
- Path testPath = new Path(rssTest);
- try {
- FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath,
hdfsConf);
- readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue);
- } catch (Exception e) {
- rankValue.setHealthy(new AtomicBoolean(false));
- LOG.error("Storage read and write error, we will not use this
remote path {}.", uri, e);
- } finally {
- sortPathByRankValue(uri.getKey(), rssTest);
- }
- }
- }
- }
}
/**
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
index 7414e4b2..29caaa18 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/strategy/storage/LowestIOSampleCostSelectStorageStrategy.java
@@ -17,24 +17,13 @@
package org.apache.uniffle.coordinator.strategy.storage;
-import java.util.List;
+import java.util.Comparator;
import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Lists;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
-import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
-import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
/**
@@ -50,9 +39,6 @@ public class LowestIOSampleCostSelectStorageStrategy extends
AbstractSelectStora
*/
private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
- private final Configuration hdfsConf;
- private final int readAndWriteTimes;
- private List<Map.Entry<String, RankValue>> uris;
public LowestIOSampleCostSelectStorageStrategy(
Map<String, RankValue> remoteStoragePathRankValue,
@@ -62,65 +48,21 @@ public class LowestIOSampleCostSelectStorageStrategy
extends AbstractSelectStora
super(remoteStoragePathRankValue, conf);
this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo;
this.availableRemoteStorageInfo = availableRemoteStorageInfo;
- this.hdfsConf = new Configuration();
- readAndWriteTimes =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES);
- }
-
- @VisibleForTesting
- public void sortPathByRankValue(
- String path, String testPath, long startWrite) {
- RankValue rankValue = remoteStoragePathRankValue.get(path);
- try {
- FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path),
hdfsConf);
- fs.delete(new Path(testPath), true);
- if (rankValue.getHealthy().get()) {
- rankValue.setCostTime(new AtomicLong(System.currentTimeMillis() -
startWrite));
- }
- } catch (Exception e) {
- rankValue.setCostTime(new AtomicLong(Long.MAX_VALUE));
- LOG.error("Failed to sort, we will not use this remote path {}.", path,
e);
- }
- uris = Lists.newCopyOnWriteArrayList(
-
remoteStoragePathRankValue.entrySet()).stream().filter(Objects::nonNull).sorted((x,
y) -> {
- final long xReadAndWriteTime = x.getValue().getCostTime().get();
- final long yReadAndWriteTime = y.getValue().getCostTime().get();
- if (xReadAndWriteTime > yReadAndWriteTime) {
- return 1;
- } else if (xReadAndWriteTime < yReadAndWriteTime) {
- return -1;
- } else {
- return Integer.compare(x.getValue().getAppNum().get(),
y.getValue().getAppNum().get());
- }
- }).collect(Collectors.toList());
- LOG.info("The sorted remote path list is: {}", uris);
}
@Override
- public void detectStorage() {
- uris =
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
- if (remoteStoragePathRankValue.size() > 1) {
- for (Map.Entry<String, RankValue> uri : uris) {
- if
(uri.getKey().startsWith(ApplicationManager.getPathSchema().get(0))) {
- Path remotePath = new Path(uri.getKey());
- String rssTest = uri.getKey() + "/rssTest-" + getCoordinatorId();
- Path testPath = new Path(rssTest);
- RankValue rankValue = remoteStoragePathRankValue.get(uri.getKey());
- rankValue.setHealthy(new AtomicBoolean(true));
- long startWriteTime = System.currentTimeMillis();
- try {
- FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath,
hdfsConf);
- for (int j = 0; j < readAndWriteTimes; j++) {
- readAndWriteHdfsStorage(fs, testPath, uri.getKey(), rankValue);
- }
- } catch (Exception e) {
- LOG.error("Storage read and write error, we will not use this
remote path {}.", uri, e);
- rankValue.setHealthy(new AtomicBoolean(false));
- } finally {
- sortPathByRankValue(uri.getKey(), rssTest, startWriteTime);
- }
- }
+ public Comparator<Map.Entry<String, RankValue>> getComparator() {
+ return (x, y) -> {
+ final long xReadAndWriteTime = x.getValue().getCostTime().get();
+ final long yReadAndWriteTime = y.getValue().getCostTime().get();
+ if (xReadAndWriteTime > yReadAndWriteTime) {
+ return 1;
+ } else if (xReadAndWriteTime < yReadAndWriteTime) {
+ return -1;
+ } else {
+ return Integer.compare(x.getValue().getAppNum().get(),
y.getValue().getAppNum().get());
}
- }
+ };
}
@Override
@@ -135,4 +77,9 @@ public class LowestIOSampleCostSelectStorageStrategy extends
AbstractSelectStora
LOG.warn("No remote storage is available, we will default to the first.");
return availableRemoteStorageInfo.values().iterator().next();
}
+
+ @Override
+ public int readAndWriteTimes(CoordinatorConf conf) {
+ return
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES);
+ }
}