smallzhongfeng commented on code in PR #210:
URL: https://github.com/apache/incubator-uniffle/pull/210#discussion_r980837192
##########
coordinator/src/main/java/org/apache/uniffle/coordinator/AppBalanceSelectStorageStrategy.java:
##########
@@ -39,106 +45,127 @@
public class AppBalanceSelectStorageStrategy implements SelectStorageStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(AppBalanceSelectStorageStrategy.class);
- /**
- * store appId -> remote path to make sure all shuffle data of the same
application
- * will be written to the same remote storage
- */
- private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
/**
* store remote path -> application count for assignment strategy
*/
- private final Map<String, RankValue> remoteStoragePathCounter;
+ private final Map<String, RankValue> remoteStoragePathRankValue;
+ private final Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo;
private final Map<String, RemoteStorageInfo> availableRemoteStorageInfo;
-
- public AppBalanceSelectStorageStrategy() {
- this.appIdToRemoteStorageInfo = Maps.newConcurrentMap();
- this.remoteStoragePathCounter = Maps.newConcurrentMap();
- this.availableRemoteStorageInfo = Maps.newHashMap();
+ private final Configuration hdfsConf;
+ private final int fileSize;
+ private final int readAndWriteTimes;
+ private boolean remotePathIsHealthy = true;
+
+ public AppBalanceSelectStorageStrategy(
+ Map<String, RankValue> remoteStoragePathRankValue,
+ Map<String, RemoteStorageInfo> appIdToRemoteStorageInfo,
+ Map<String, RemoteStorageInfo> availableRemoteStorageInfo,
+ CoordinatorConf conf) {
+ this.remoteStoragePathRankValue = remoteStoragePathRankValue;
+ this.appIdToRemoteStorageInfo = appIdToRemoteStorageInfo;
+ this.availableRemoteStorageInfo = availableRemoteStorageInfo;
+ this.hdfsConf = new Configuration();
+ fileSize =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_FILE_SIZE);
+ readAndWriteTimes =
conf.getInteger(CoordinatorConf.COORDINATOR_REMOTE_STORAGE_SCHEDULE_ACCESS_TIMES);
}
- /**
- * the strategy of pick remote storage is according to assignment count
- */
- @Override
- public RemoteStorageInfo pickRemoteStorage(String appId) {
- if (appIdToRemoteStorageInfo.containsKey(appId)) {
- return appIdToRemoteStorageInfo.get(appId);
- }
-
- // create list for sort
- List<Map.Entry<String, RankValue>> sizeList =
-
Lists.newArrayList(remoteStoragePathCounter.entrySet()).stream().filter(Objects::nonNull)
- .sorted(Comparator.comparingInt(entry ->
entry.getValue().getAppNum().get())).collect(Collectors.toList());
-
- for (Map.Entry<String, RankValue> entry : sizeList) {
- String storagePath = entry.getKey();
- if (availableRemoteStorageInfo.containsKey(storagePath)) {
- appIdToRemoteStorageInfo.putIfAbsent(appId,
availableRemoteStorageInfo.get(storagePath));
- incRemoteStorageCounter(storagePath);
- break;
- }
- }
- return appIdToRemoteStorageInfo.get(appId);
- }
-
- @Override
@VisibleForTesting
- public synchronized void incRemoteStorageCounter(String remoteStoragePath) {
- RankValue counter = remoteStoragePathCounter.get(remoteStoragePath);
- if (counter != null) {
- counter.getAppNum().incrementAndGet();
- } else {
- // it may be happened when assignment remote storage
- // and refresh remote storage at the same time
- LOG.warn("Remote storage path lost during assignment: {} doesn't exist,
reset it to 1",
- remoteStoragePath);
- remoteStoragePathCounter.put(remoteStoragePath, new RankValue(1));
+ public List<Map.Entry<String, RankValue>> sortPathByRankValue(
+ String path, String test, boolean isHealthy) {
+ try {
+ FileSystem fs = HadoopFilesystemProvider.getFilesystem(new Path(path),
hdfsConf);
+ fs.delete(new Path(test),true);
+ if (isHealthy) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(0,
rankValue.getAppNum().get()));
+ }
+ } catch (Exception e) {
+ RankValue rankValue = remoteStoragePathRankValue.get(path);
+ remoteStoragePathRankValue.put(path, new RankValue(Long.MAX_VALUE,
rankValue.getAppNum().get()));
+ LOG.error("Failed to sort, we will not use this remote path {}.", path,
e);
}
+ return
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet()).stream()
+ .filter(Objects::nonNull).collect(Collectors.toList());
}
@Override
- @VisibleForTesting
- public synchronized void decRemoteStorageCounter(String storagePath) {
- if (!StringUtils.isEmpty(storagePath)) {
- RankValue atomic = remoteStoragePathCounter.get(storagePath);
- if (atomic != null) {
- double count = atomic.getAppNum().decrementAndGet();
- if (count < 0) {
- LOG.warn("Unexpected counter for remote storage: {}, which is {},
reset to 0",
- storagePath, count);
- atomic.getAppNum().set(0);
+ public List<Map.Entry<String, RankValue>> detectStorage(String uri) {
+ if (uri.startsWith(ApplicationManager.REMOTE_PATH_SCHEMA.get(0))) {
+ setRemotePathIsHealthy(true);
+ Path remotePath = new Path(uri);
+ String rssTest = uri + "/rssTest";
+ Path testPath = new Path(rssTest);
+ try {
+ FileSystem fs = HadoopFilesystemProvider.getFilesystem(remotePath,
hdfsConf);
+ for (int j = 0; j < readAndWriteTimes; j++) {
+ byte[] data = RandomUtils.nextBytes(fileSize);
+ try (FSDataOutputStream fos = fs.create(testPath)) {
+ fos.write(data);
+ fos.flush();
+ }
+ byte[] readData = new byte[fileSize];
+ int readBytes;
+ try (FSDataInputStream fis = fs.open(testPath)) {
+ int hasReadBytes = 0;
+ do {
+ readBytes = fis.read(readData);
+ if (hasReadBytes < fileSize) {
+ for (int i = 0; i < readBytes; i++) {
+ if (data[hasReadBytes + i] != readData[i]) {
+ RankValue rankValue = remoteStoragePathRankValue.get(uri);
+ remoteStoragePathRankValue.put(uri,
+ new RankValue(Long.MAX_VALUE,
rankValue.getAppNum().get()));
+ throw new RssException("The content of reading and writing
is inconsistent.");
+ }
+ }
+ }
+ hasReadBytes += readBytes;
+ } while (readBytes != -1);
+ }
}
- } else {
- LOG.warn("Can't find counter for remote storage: {}", storagePath);
- remoteStoragePathCounter.putIfAbsent(storagePath, new RankValue(0));
- }
- if (remoteStoragePathCounter.get(storagePath).getAppNum().get() == 0
- && !availableRemoteStorageInfo.containsKey(storagePath)) {
- remoteStoragePathCounter.remove(storagePath);
+ } catch (Exception e) {
+ setRemotePathIsHealthy(false);
+ LOG.error("Storage read and write error, we will not use this remote
path {}.", uri, e);
+ RankValue rankValue = remoteStoragePathRankValue.get(uri);
+ remoteStoragePathRankValue.put(uri, new RankValue(Long.MAX_VALUE,
rankValue.getAppNum().get()));
+ } finally {
+ return sortPathByRankValue(uri, rssTest, remotePathIsHealthy);
}
+ } else {
+ return
Lists.newCopyOnWriteArrayList(remoteStoragePathRankValue.entrySet());
}
}
+ /**
+ * When choosing the AppBalance strategy, each time you select a path,
+ * you should know the number of the latest apps in different paths
+ */
@Override
- public synchronized void removePathFromCounter(String storagePath) {
- RankValue atomic = remoteStoragePathCounter.get(storagePath);
- if (atomic != null && atomic.getAppNum().get() == 0) {
- remoteStoragePathCounter.remove(storagePath);
+ public synchronized RemoteStorageInfo pickStorage(
+ List<Map.Entry<String, RankValue>> uris, String appId) {
+ boolean isUnhealthy =
+ uris.stream().noneMatch(rv ->
rv.getValue().getReadAndWriteTime().get() != Long.MAX_VALUE);
+ if (!isUnhealthy) {
+ // If there is only one unhealthy path, then filter that path
+ uris = uris.stream().filter(rv ->
rv.getValue().getReadAndWriteTime().get() != Long.MAX_VALUE).sorted(
+ Comparator.comparingInt(entry ->
entry.getValue().getAppNum().get())).collect(Collectors.toList());
+ } else {
+ // If all paths are unhealthy, assign paths according to the number of
apps
Review Comment:
OK, I will add.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]