This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 849993c2 [ISSUE-484] Fix accidentally remove the storage of appId when
unregistering partial shuffle in HdfsStorageManager (#485)
849993c2 is described below
commit 849993c26d06c422f38f0bb242af00fb2e341290
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jan 17 10:03:30 2023 +0800
[ISSUE-484] Fix accidentally remove the storage of appId when unregistering
partial shuffle in HdfsStorageManager (#485)
### What changes were proposed in this pull request?
When one app's partial shuffles are removed, it will make other shuffle
fail of flushing data due to its missing storage referenced from its appId.
### Why are the changes needed?
Fix accidentally remove the storage of appId when unregistering partial
shuffle in HdfsStorageManager
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
1. UTs
---
.../test/ShuffleUnregisterWithHdfsTest.java | 5 ++++
.../test/ShuffleUnregisterWithLocalfileTest.java | 5 ++++
.../uniffle/server/storage/HdfsStorageManager.java | 7 +++---
.../server/storage/HdfsStorageManagerTest.java | 27 ++++++++++++++++++++++
4 files changed, 41 insertions(+), 3 deletions(-)
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
index 2eab10d4..bfa24023 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
@@ -90,6 +90,11 @@ public class ShuffleUnregisterWithHdfsTest extends
SparkIntegrationTestBase {
Thread.sleep(1000);
assertFalse(fs.exists(new Path(shufflePath)));
assertTrue(fs.exists(new Path(appPath)));
+
+ // After unregistering partial shuffle, newly shuffle could work
+ map = javaPairRDD.collectAsMap();
+ shufflePath = appPath + "/1";
+ assertTrue(fs.exists(new Path(shufflePath)));
} else {
runCounter++;
}
diff --git
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
index a3dd18f5..6e5e0367 100644
---
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
+++
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
@@ -92,6 +92,11 @@ public class ShuffleUnregisterWithLocalfileTest extends
SparkIntegrationTestBase
Thread.sleep(1000);
assertFalse(new File(shufflePath).exists());
assertTrue(new File(appPath).exists());
+
+ // After unregistering partial shuffle, newly shuffle could work
+ map = javaPairRDD.collectAsMap();
+ shufflePath = appPath + "/1";
+ assertTrue(new File(shufflePath).exists());
} else {
runCounter++;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 7c4af1d8..1532ff2b 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -84,13 +84,12 @@ public class HdfsStorageManager extends
SingleStorageManager {
@Override
public void removeResources(PurgeEvent event) {
String appId = event.getAppId();
- String user = event.getUser();
HdfsStorage storage = getStorageByAppId(appId);
if (storage != null) {
if (event instanceof AppPurgeEvent) {
storage.removeHandlers(appId);
+ appIdToStorages.remove(appId);
}
- appIdToStorages.remove(appId);
ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory
.getInstance()
.createShuffleDeleteHandler(
@@ -107,7 +106,9 @@ public class HdfsStorageManager extends
SingleStorageManager {
deletePaths.add(ShuffleStorageUtils.getFullShuffleDataFolder(basicPath,
String.valueOf(shuffleId)));
}
}
- deleteHandler.delete(deletePaths.toArray(new String[0]), appId, user);
+ deleteHandler.delete(deletePaths.toArray(new String[0]), appId,
event.getUser());
+ } else {
+ LOG.warn("Storage gotten is null when removing resources for event: {}",
event);
}
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
index b1b0cd98..1def59d4 100644
---
a/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
@@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.ShufflePurgeEvent;
import org.apache.uniffle.storage.common.HdfsStorage;
import org.apache.uniffle.storage.util.StorageType;
@@ -49,6 +51,31 @@ public class HdfsStorageManagerTest {
ShuffleServerMetrics.clear();
}
+ @Test
+ public void testRemoveResources() {
+ ShuffleServerConf conf = new ShuffleServerConf();
+ conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.MEMORY_LOCALFILE_HDFS.name());
+ HdfsStorageManager hdfsStorageManager = new HdfsStorageManager(conf);
+ final String remoteStoragePath1 = "hdfs://path1";
+ String appId = "testRemoveResources_appId";
+ hdfsStorageManager.registerRemoteStorage(
+ appId,
+ new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1",
"k2", "v2"))
+ );
+ Map<String, HdfsStorage> appStorageMap =
hdfsStorageManager.getAppIdToStorages();
+
+ // case1
+ assertEquals(1, appStorageMap.size());
+ ShufflePurgeEvent shufflePurgeEvent = new ShufflePurgeEvent(appId, "",
Arrays.asList(1));
+ hdfsStorageManager.removeResources(shufflePurgeEvent);
+ assertEquals(1, appStorageMap.size());
+
+ // case2
+ AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
+ hdfsStorageManager.removeResources(appPurgeEvent);
+ assertEquals(0, appStorageMap.size());
+ }
+
@Test
public void testRegisterRemoteStorage() {
ShuffleServerConf conf = new ShuffleServerConf();