jerqi commented on code in PR #210:
URL: https://github.com/apache/incubator-uniffle/pull/210#discussion_r980668872
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java:
##########
@@ -42,39 +43,59 @@
public class ApplicationManager {
private static final Logger LOG =
LoggerFactory.getLogger(ApplicationManager.class);
- private long expired;
- private StrategyName storageStrategy;
- private Map<String, Long> appIds = Maps.newConcurrentMap();
- private SelectStorageStrategy selectStorageStrategy;
+ // TODO: Add anomaly detection for other storage
+ public static final List<String> REMOTE_PATH_SCHEMA = Arrays.asList("hdfs");
+ private final long expired;
+ private final StrategyName storageStrategy;
+ private final Map<String, Long> appIds = Maps.newConcurrentMap();
+ private final SelectStorageStrategy selectStorageStrategy;
// store appId -> remote path to make sure all shuffle data of the same
application
// will be written to the same remote storage
- private Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
+ private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
// store remote path -> application count for assignment strategy
- private Map<String, RankValue> remoteStoragePathRankValue;
- private Map<String, String> remoteStorageToHost = Maps.newConcurrentMap();
- private Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
- private ScheduledExecutorService scheduledExecutorService;
+ private final Map<String, RankValue> remoteStoragePathRankValue;
+ private final Map<String, String> remoteStorageToHost =
Maps.newConcurrentMap();
+ private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
+ private List<Map.Entry<String, RankValue>> sizeList;
// it's only for test case to check if status check has problem
private boolean hasErrorInStatusCheck = false;
public ApplicationManager(CoordinatorConf conf) {
storageStrategy =
conf.get(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SELECT_STRATEGY);
+ appIdToRemoteStorageInfo = Maps.newConcurrentMap();
+ remoteStoragePathRankValue = Maps.newConcurrentMap();
+ availableRemoteStorageInfo = Maps.newConcurrentMap();
if (StrategyName.IO_SAMPLE == storageStrategy) {
- selectStorageStrategy = new
LowestIOSampleCostSelectStorageStrategy(conf);
+ selectStorageStrategy = new
LowestIOSampleCostSelectStorageStrategy(remoteStoragePathRankValue,
+ appIdToRemoteStorageInfo, availableRemoteStorageInfo, conf);
} else if (StrategyName.APP_BALANCE == storageStrategy) {
- selectStorageStrategy = new AppBalanceSelectStorageStrategy();
+ selectStorageStrategy = new
AppBalanceSelectStorageStrategy(remoteStoragePathRankValue,
+ appIdToRemoteStorageInfo, availableRemoteStorageInfo, conf);
} else {
throw new UnsupportedOperationException("Unsupported selected storage
strategy.");
}
- appIdToRemoteStorageInfo =
selectStorageStrategy.getAppIdToRemoteStorageInfo();
- remoteStoragePathRankValue =
selectStorageStrategy.getRemoteStoragePathRankValue();
- availableRemoteStorageInfo =
selectStorageStrategy.getAvailableRemoteStorageInfo();
expired = conf.getLong(CoordinatorConf.COORDINATOR_APP_EXPIRED);
// the thread for checking application status
- scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
+ ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("ApplicationManager-%d"));
scheduledExecutorService.scheduleAtFixedRate(
- () -> statusCheck(), expired / 2, expired / 2, TimeUnit.MILLISECONDS);
+ this::statusCheck, expired / 2, expired / 2, TimeUnit.MILLISECONDS);
+ // the thread for checking if the storage is normal
+ ScheduledExecutorService detectStorageScheduler =
Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("readWriteRankScheduler-%d"));
+ // should init later than the refreshRemoteStorage init
+ detectStorageScheduler.scheduleAtFixedRate(this::checkReadAndWrite, 1000,
+
conf.getLong(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_TIME),
TimeUnit.MILLISECONDS);
+ }
+
+ public void checkReadAndWrite() {
Review Comment:
Should we put this logic into the class `SelectStorageStrategy`?
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java:
##########
@@ -39,106 +45,127 @@
public class AppBalanceSelectStorageStrategy implements SelectStorageStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(AppBalanceSelectStorageStrategy.class);
- /**
- * store appId -> remote path to make sure all shuffle data of the same
application
- * will be written to the same remote storage
- */
- private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
/**
* store remote path -> application count for assignment strategy
*/
- private final Map<String, RankValue> remoteStoragePathCounter;
+ private final Map<String, RankValue> remoteStoragePathRankValue;
+ private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
-
- public AppBalanceSelectStorageStrategy() {
- this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
- this.remoteStoragePathCounter = Maps.newConcurrentMap();
- this.availableRemoteStorageInfo = Maps.newHashMap();
+ private final Configuration hdfsConf;
+ private final int fileSize;
+ private final int readAndWriteTimes;
+ private boolean remotePathIsHealthy = true;
Review Comment:
What does this variable `remotePathIsHealthy` mean?
This class is a strategy. I can't get the meaning of this varibale.
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java:
##########
@@ -39,106 +45,127 @@
public class AppBalanceSelectStorageStrategy implements SelectStorageStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(AppBalanceSelectStorageStrategy.class);
- /**
- * store appId -> remote path to make sure all shuffle data of the same
application
- * will be written to the same remote storage
- */
- private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
/**
* store remote path -> application count for assignment strategy
*/
- private final Map<String, RankValue> remoteStoragePathCounter;
+ private final Map<String, RankValue> remoteStoragePathRankValue;
+ private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
-
- public AppBalanceSelectStorageStrategy() {
- this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
- this.remoteStoragePathCounter = Maps.newConcurrentMap();
- this.availableRemoteStorageInfo = Maps.newHashMap();
+ private final Configuration hdfsConf;
+ private final int fileSize;
+ private final int readAndWriteTimes;
+ private boolean remotePathIsHealthy = true;
+
+ public AppBalanceSelectStorageStrategy(
+ Map<String, RankValue> remoteStoragePathRankValue,
+ Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo,
+ Map<String, RemoteStorageInfo> availableRemoteStorageInfo,
+ CoordinatorConf conf) {
+ this.remoteStoragePathRankValue = remoteStoragePathRankValue;
+ this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo;
+ this.availableRemoteStorageInfo = availableRemoteStorageInfo;
+ this.hdfsConf = new Configuration();
+ fileSize =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+ readAndWriteTimes =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES);
}
- /**
- * the strategy of pick remote storage is according to assignment count
- */
- @Override
- public RemoteStorageInfo pickRemoteStorage(String appId) {
- if (appIdToRemoteStorageInfo.containsKey(appId)) {
- return appIdToRemoteStorageInfo.get(appId);
- }
-
- // create list for sort
- List<Map.Entry<String, RankValue>> sizeList =
-
Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull)
- .sorted(Comparator.comparingInt(entry ->
entry.getValue().getAppNum().get())).collect(Collectors.toList());
-
- for (Map.Entry<String, RankValue> entry : sizeList) {
- String storagePath = entry.getKey();
- if (availableRemoteStorageInfo.containsKey(storagePath)) {
- appIdToRemoteStorageInfo.putIfAbsent(appId,
availableRemoteStorageInfo.get(storagePath));
- incRemoteStorageCounter(storagePath);
- break;
- }
- }
- return appIdToRemoteStorageInfo.get(appId);
- }
-
- @Override
@VisibleForTesting
- public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
- RankValue counter = remoteStoragePathCounter.get(remoteStoragePath);
- if (counter != null) {
- counter.getAppNum().incrementAndGet();
- } else {
- // it may be happened when assignment remote storage
- // and refresh remote storage at the same time
- LOG.warn("Remote storage path lost during assignment: {} doesn't exist,
reset it to 1",
- remoteStoragePath);
- remoteStoragePathCounter.put(remoteStoragePath, new RankValue(1));
+ public List<Map.Entry<String, RankValue>> sortPathByRankValue(
+ String path, String test, boolean isHealthy) {
+ try {
+ FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path),
hdfsConf);
+ fs.delete(new Path(test),true);
+ if (isHealthy) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(0,
rankValue.getAppNum().get()));
+ }
+ } catch (Exception e) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE,
rankValue.getAppNum().get()));
+ LOG.error("Failed to sort, we will not use this remote path {}.", path,
e);
}
+ return
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream()
+ .filter(Objects::nonNull).collect(Collectors.toList());
}
@Override
- @VisibleForTesting
- public synchronized void decRemoteStorageCounter(String storagePath) {
- if (!StringUtils.isEmpty(storagePath)) {
- RankValue atomic = remoteStoragePathCounter.get(storagePath);
- if (atomic != null) {
- double count = atomic.getAppNum().decrementAndGet();
- if (count < 0) {
- LOG.warn("Unexpected counter for remote storage: {}, which is {},
reset to 0",
- storagePath, count);
- atomic.getAppNum().set(0);
+ public List<Map.Entry<String, RankValue>> detectStorage(String uri) {
+ if (uri.startsWith(ApplicationManager.REMOTE_PATH_SCHEMA.get(0))) {
+ setRemotePathIsHealthy(true);
+ Path remotePath = new Path(uri);
+ String rssTest = uri + "/rssTest";
+ Path testPath = new Path(rssTest);
+ try {
+ FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath,
hdfsConf);
+ for (int j = 0; j < readAndWriteTimes; j++) {
+ byte[] data = RandomUtils.nextBytes(fileSize);
+ try (FSDataOutputStream fos = fs.create(testPath)) {
+ fos.write(data);
+ fos.flush();
+ }
+ byte[] readData = new byte[fileSize];
+ int readBytes;
+ try (FSDataInputStream fis = fs.open(testPath)) {
+ int hasReadBytes = 0;
+ do {
+ readBytes = fis.read(readData);
+ if (hasReadBytes < fileSize) {
+ for (int i = 0; i < readBytes; i++) {
+ if (data[hasReadBytes + i] != readData[i]) {
+ RankValue rankValue = remoteStoragePathRankValue.get(uri);
+ remoteStoragePathRankValue.put(uri,
+ new RankValue(Long.MAX_VALUE,
rankValue.getAppNum().get()));
+ throw new RssException("The content of reading and writing
is inconsistent.");
+ }
+ }
+ }
+ hasReadBytes += readBytes;
+ } while (readBytes != -1);
+ }
}
- } else {
- LOG.warn("Can't find counter for remote storage: {}", storagePath);
- remoteStoragePathCounter.putIfAbsent(storagePath, new RankValue(0));
- }
- if (remoteStoragePathCounter.get(storagePath).getAppNum().get() == 0
- && !availableRemoteStorageInfo.containsKey(storagePath)) {
- remoteStoragePathCounter.remove(storagePath);
+ } catch (Exception e) {
+ setRemotePathIsHealthy(false);
+ LOG.error("Storage read and write error, we will not use this remote
path {}.", uri, e);
+ RankValue rankValue = remoteStoragePathRankValue.get(uri);
+ remoteStoragePathRankValue.put(uri, new RankValue(Long.MAX_VALUE,
rankValue.getAppNum().get()));
+ } finally {
+ return sortPathByRankValue(uri, rssTest, remotePathIsHealthy);
}
+ } else {
+ return
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
}
}
+ /**
+ * When choosing the AppBalance strategy, each time you select a path,
+ * you should know the number of the latest apps in different paths
+ */
@Override
- public synchronized void removePathFromCounter(String storagePath) {
- RankValue atomic = remoteStoragePathCounter.get(storagePath);
- if (atomic != null && atomic.getAppNum().get() == 0) {
- remoteStoragePathCounter.remove(storagePath);
+ public synchronized RemoteStorageInfo pickStorage(
+ List<Map.Entry<String, RankValue>> uris, String appId) {
+ boolean isUnhealthy =
+ uris.stream().noneMatch(rv ->
rv.getValue().getReadAndWriteTime().get() != Long.MAX_VALUE);
+ if (!isUnhealthy) {
+ // If there is only one unhealthy path, then filter that path
+ uris = uris.stream().filter(rv ->
rv.getValue().getReadAndWriteTime().get() != Long.MAX_VALUE).sorted(
+ Comparator.comparingInt(entry ->
entry.getValue().getAppNum().get())).collect(Collectors.toList());
+ } else {
+ // If all paths are unhealthy, assign paths according to the number of
apps
Review Comment:
Should we add some logs and metrics when all paths are unhealthy?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]