This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 805a3727 [Improvement] Should match from pathToStorages when appId
does not exist in appIdToStorages (#168)
805a3727 is described below
commit 805a372718c0ffe17b06dea416493f9bda48c351
Author: jokercurry <[email protected]>
AuthorDate: Mon Aug 22 20:11:06 2022 +0800
[Improvement] Should match from pathToStorages when appId does not exist in
appIdToStorages (#168)
### What changes were proposed in this pull request?


From the audit log of HDFS, it can be seen that when the HDFS path of this
app was last deleted at 18:00:55, the log in the `shuffleServer` found that the
error about `file could not be found`, and the file would continue to be
written. At last we found that when some appId cache was removed in
`appIdToStorages`, and then `HdfsStorageManager` calls `removeResources` will
cause storagePath to not be deleted.
### Why are the changes needed?
When the cache of `appIdToStorages` removed, the remote path can be deleted
normally.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added ut.
---
.../uniffle/server/storage/HdfsStorageManager.java | 31 +++++++++++++++++-----
.../uniffle/server/ShuffleFlushManagerTest.java | 10 +++++++
2 files changed, 35 insertions(+), 6 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 6fa50d17..7c0c0dd3 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -17,16 +17,22 @@
package org.apache.uniffle.server.storage;
+import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.uniffle.common.RemoteStorageInfo;
+import org.apache.uniffle.common.filesystem.HadoopFilesystemProvider;
+import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
import org.apache.uniffle.server.ShuffleDataReadEvent;
@@ -108,13 +114,26 @@ public class HdfsStorageManager extends
SingleStorageManager {
appIdToStorages.putIfAbsent(appId, pathToStorages.get(remoteStorage));
}
- private HdfsStorage getStorageByAppId(String appId) {
+ public HdfsStorage getStorageByAppId(String appId) {
if (!appIdToStorages.containsKey(appId)) {
- String msg = "Can't find HDFS storage for appId[" + appId + "]";
- LOG.error(msg);
- // outside should deal with null situation
- // todo: it's better to have a fake storage for null situation
- return null;
+ synchronized (this) {
+ FileSystem fs;
+ try {
+ List<Path> appStoragePath = pathToStorages.keySet().stream().map(
+ basePath -> new Path(basePath + Constants.KEY_SPLIT_CHAR +
appId)).collect(Collectors.toList());
+ for (Path path : appStoragePath) {
+ fs = HadoopFilesystemProvider.getFilesystem(path, hadoopConf);
+ if (fs.isDirectory(path)) {
+ return new HdfsStorage(path.getParent().toString(), hadoopConf);
+ }
+ }
+ } catch (Exception e) {
+ LOG.error("Some error happened when fileSystem got the file
status.", e);
+ }
+ // outside should deal with null situation
+ // todo: it's better to have a fake storage for null situation
+ return null;
+ }
}
return appIdToStorages.get(appId);
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
index ebdd12ec..ba670a22 100644
---
a/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/ShuffleFlushManagerTest.java
@@ -57,12 +57,14 @@ import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.server.storage.StorageManagerFactory;
import org.apache.uniffle.storage.HdfsTestBase;
import org.apache.uniffle.storage.common.AbstractStorage;
+import org.apache.uniffle.storage.common.HdfsStorage;
import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
import org.apache.uniffle.storage.util.StorageType;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -246,6 +248,14 @@ public class ShuffleFlushManagerTest extends HdfsTestBase {
assertEquals(0, manager.getCommittedBlockIds(appId2,
1).getLongCardinality());
size = storage.getHandlerSize();
assertEquals(0, size);
+ // fs create a remoteStorage for appId2 before remove resources,
+ // but thecache from appIdToStorages has removed, so we need to delete
this path in hdfs
+ Path path = new Path(remoteStorage.getPath() + "/" + appId2 + "/");
+ assertTrue(fs.mkdirs(path));
+ storageManager.removeResources(appId2, Sets.newHashSet(1),
StringUtils.EMPTY);
+ assertFalse(fs.exists(path));
+ HdfsStorage storageByAppId = ((HdfsStorageManager)
storageManager).getStorageByAppId(appId2);
+ assertNull(storageByAppId);
}
@Test