This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 849993c2 [ISSUE-484] Fix accidentally remove the storage of appId when 
unregistering partial shuffle in HdfsStorageManager (#485)
849993c2 is described below

commit 849993c26d06c422f38f0bb242af00fb2e341290
Author: Junfan Zhang <[email protected]>
AuthorDate: Tue Jan 17 10:03:30 2023 +0800

    [ISSUE-484] Fix accidentally remove the storage of appId when unregistering 
partial shuffle in HdfsStorageManager (#485)
    
    ### What changes were proposed in this pull request?
    When one app's partial shuffles are removed, it will make other shuffle 
fail of flushing data due to its missing storage referenced from its appId.
    
    ### Why are the changes needed?
    Fix accidentally remove the storage of appId when unregistering partial 
shuffle in HdfsStorageManager
    
    ### Does this PR introduce _any_ user-facing change?
    NO
    
    ### How was this patch tested?
    1. UTs
---
 .../test/ShuffleUnregisterWithHdfsTest.java        |  5 ++++
 .../test/ShuffleUnregisterWithLocalfileTest.java   |  5 ++++
 .../uniffle/server/storage/HdfsStorageManager.java |  7 +++---
 .../server/storage/HdfsStorageManagerTest.java     | 27 ++++++++++++++++++++++
 4 files changed, 41 insertions(+), 3 deletions(-)

diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
index 2eab10d4..bfa24023 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithHdfsTest.java
@@ -90,6 +90,11 @@ public class ShuffleUnregisterWithHdfsTest extends 
SparkIntegrationTestBase {
       Thread.sleep(1000);
       assertFalse(fs.exists(new Path(shufflePath)));
       assertTrue(fs.exists(new Path(appPath)));
+
+      // After unregistering partial shuffle, newly shuffle could work
+      map = javaPairRDD.collectAsMap();
+      shufflePath = appPath + "/1";
+      assertTrue(fs.exists(new Path(shufflePath)));
     } else {
       runCounter++;
     }
diff --git 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
index a3dd18f5..6e5e0367 100644
--- 
a/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
+++ 
b/integration-test/spark-common/src/test/java/org/apache/uniffle/test/ShuffleUnregisterWithLocalfileTest.java
@@ -92,6 +92,11 @@ public class ShuffleUnregisterWithLocalfileTest extends 
SparkIntegrationTestBase
       Thread.sleep(1000);
       assertFalse(new File(shufflePath).exists());
       assertTrue(new File(appPath).exists());
+
+      // After unregistering partial shuffle, newly shuffle could work
+      map = javaPairRDD.collectAsMap();
+      shufflePath = appPath + "/1";
+      assertTrue(new File(shufflePath).exists());
     } else {
       runCounter++;
     }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 7c4af1d8..1532ff2b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -84,13 +84,12 @@ public class HdfsStorageManager extends 
SingleStorageManager {
   @Override
   public void removeResources(PurgeEvent event) {
     String appId = event.getAppId();
-    String user = event.getUser();
     HdfsStorage storage = getStorageByAppId(appId);
     if (storage != null) {
       if (event instanceof AppPurgeEvent) {
         storage.removeHandlers(appId);
+        appIdToStorages.remove(appId);
       }
-      appIdToStorages.remove(appId);
       ShuffleDeleteHandler deleteHandler = ShuffleHandlerFactory
           .getInstance()
           .createShuffleDeleteHandler(
@@ -107,7 +106,9 @@ public class HdfsStorageManager extends 
SingleStorageManager {
           
deletePaths.add(ShuffleStorageUtils.getFullShuffleDataFolder(basicPath, 
String.valueOf(shuffleId)));
         }
       }
-      deleteHandler.delete(deletePaths.toArray(new String[0]), appId, user);
+      deleteHandler.delete(deletePaths.toArray(new String[0]), appId, 
event.getUser());
+    } else {
+      LOG.warn("Storage gotten is null when removing resources for event: {}", 
event);
     }
   }
 
diff --git 
a/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
index b1b0cd98..1def59d4 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/storage/HdfsStorageManagerTest.java
@@ -30,6 +30,8 @@ import org.junit.jupiter.api.Test;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.server.ShuffleServerConf;
 import org.apache.uniffle.server.ShuffleServerMetrics;
+import org.apache.uniffle.server.event.AppPurgeEvent;
+import org.apache.uniffle.server.event.ShufflePurgeEvent;
 import org.apache.uniffle.storage.common.HdfsStorage;
 import org.apache.uniffle.storage.util.StorageType;
 
@@ -49,6 +51,31 @@ public class HdfsStorageManagerTest {
     ShuffleServerMetrics.clear();
   }
 
+  @Test
+  public void testRemoveResources() {
+    ShuffleServerConf conf = new ShuffleServerConf();
+    conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE, 
StorageType.MEMORY_LOCALFILE_HDFS.name());
+    HdfsStorageManager hdfsStorageManager = new HdfsStorageManager(conf);
+    final String remoteStoragePath1 = "hdfs://path1";
+    String appId = "testRemoveResources_appId";
+    hdfsStorageManager.registerRemoteStorage(
+        appId,
+        new RemoteStorageInfo(remoteStoragePath1, ImmutableMap.of("k1", "v1", 
"k2", "v2"))
+    );
+    Map<String, HdfsStorage> appStorageMap =  
hdfsStorageManager.getAppIdToStorages();
+
+    // case1
+    assertEquals(1, appStorageMap.size());
+    ShufflePurgeEvent shufflePurgeEvent = new ShufflePurgeEvent(appId, "", 
Arrays.asList(1));
+    hdfsStorageManager.removeResources(shufflePurgeEvent);
+    assertEquals(1, appStorageMap.size());
+
+    // case2
+    AppPurgeEvent appPurgeEvent = new AppPurgeEvent(appId, "");
+    hdfsStorageManager.removeResources(appPurgeEvent);
+    assertEquals(0, appStorageMap.size());
+  }
+
   @Test
   public void testRegisterRemoteStorage() {
     ShuffleServerConf conf = new ShuffleServerConf();

Reply via email to