This is an automated email from the ASF dual-hosted git repository.

rickyma pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new e500dc597 [#1956] fix(server): Prevent file deletion hang in case of 
disk corruption (#1958)
e500dc597 is described below

commit e500dc5973e51723504909c2e48fd2e945e71f5e
Author: Junfan Zhang <zus...@apache.org>
AuthorDate: Fri Jul 26 16:20:19 2024 +0800

    [#1956] fix(server): Prevent file deletion hang in case of disk corruption 
(#1958)
    
    ### What changes were proposed in this pull request?
    
    Prevent file deletion hang in case of disk corruption.
    
    ### Why are the changes needed?
    
    Fix: #1956
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    Unit tests
---
 .../apache/uniffle/server/ShuffleServerConf.java   |  6 ++
 .../apache/uniffle/server/ShuffleTaskManager.java  | 73 +++++++++++++++++++---
 .../server/storage/LocalStorageManager.java        |  4 +-
 .../uniffle/server/ShuffleTaskManagerTest.java     | 55 ++++++++++++++++
 4 files changed, 128 insertions(+), 10 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index e504cd2e3..92e5a46fa 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -636,6 +636,12 @@ public class ShuffleServerConf extends RssBaseConf {
               "A comma-separated block size list, where each value"
                   + " can be suffixed with a memory size unit, such as kb or 
k, mb or m, etc.");
 
+  public static final ConfigOption<Long> 
STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC =
+      ConfigOptions.key("rss.server.storage.resourceRemoveOperationTimeoutSec")
+          .longType()
+          .defaultValue(10 * 60L)
+          .withDescription("The storage remove resource operation timeout.");
+
   public ShuffleServerConf() {}
 
   public ShuffleServerConf(String fileName) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index b258c8a11..f90ec7c8a 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -26,13 +26,17 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Supplier;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
@@ -61,6 +65,7 @@ import org.apache.uniffle.common.exception.NoBufferException;
 import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
 import org.apache.uniffle.common.exception.NoRegisterException;
 import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.future.CompletableFutureExtension;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.BlockIdLayout;
 import org.apache.uniffle.common.util.Constants;
@@ -92,7 +97,7 @@ public class ShuffleTaskManager {
   private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
   private ScheduledExecutorService triggerFlushExecutorService;
   private final TopNShuffleDataSizeOfAppCalcTask 
topNShuffleDataSizeOfAppCalcTask;
-  private final StorageManager storageManager;
+  private StorageManager storageManager;
   private AtomicLong requireBufferId = new AtomicLong(0);
   private ShuffleServerConf conf;
   private long appExpiredWithoutHB;
@@ -113,6 +118,7 @@ public class ShuffleTaskManager {
   private Thread clearResourceThread;
   private BlockingQueue<PurgeEvent> expiredAppIdQueue = 
Queues.newLinkedBlockingQueue();
   private final Cache<String, ReentrantReadWriteLock> appLocks;
+  private final long storageRemoveOperationTimeoutSec;
 
   public ShuffleTaskManager(
       ShuffleServerConf conf,
@@ -127,6 +133,8 @@ public class ShuffleTaskManager {
     this.appExpiredWithoutHB = 
conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
     this.commitCheckIntervalMax = 
conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
     this.preAllocationExpired = 
conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
+    this.storageRemoveOperationTimeoutSec =
+        
conf.getLong(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC);
     this.leakShuffleDataCheckInterval =
         
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
     this.triggerFlushInterval = 
conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL);
@@ -766,8 +774,18 @@ public class ShuffleTaskManager {
               });
       shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds);
       shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds);
-      storageManager.removeResources(
-          new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
+
+      String operationMsg =
+          String.format("removing storage data for appId:%s, shuffleIds:%s", 
appId, shuffleIds);
+      withTimeoutExecution(
+          () -> {
+            storageManager.removeResources(
+                new ShufflePurgeEvent(appId, getUserByAppId(appId), 
shuffleIds));
+            return null;
+          },
+          storageRemoveOperationTimeoutSec,
+          operationMsg);
+
       LOG.info(
           "Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]",
           appId,
@@ -811,12 +829,21 @@ public class ShuffleTaskManager {
       partitionsToBlockIds.remove(appId);
       shuffleBufferManager.removeBuffer(appId);
       shuffleFlushManager.removeResources(appId);
-      storageManager.removeResources(
-          new AppPurgeEvent(
-              appId,
-              shuffleTaskInfo.getUser(),
-              new ArrayList<>(shuffleTaskInfo.getShuffleIds()),
-              checkAppExpired));
+
+      String operationMsg = String.format("removing storage data for 
appId:%s", appId);
+      withTimeoutExecution(
+          () -> {
+            storageManager.removeResources(
+                new AppPurgeEvent(
+                    appId,
+                    shuffleTaskInfo.getUser(),
+                    new ArrayList<>(shuffleTaskInfo.getShuffleIds()),
+                    checkAppExpired));
+            return null;
+          },
+          storageRemoveOperationTimeoutSec,
+          operationMsg);
+
       if (shuffleTaskInfo.hasHugePartition()) {
         ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec();
         ShuffleServerMetrics.gaugeHugePartitionNum.dec();
@@ -832,6 +859,28 @@ public class ShuffleTaskManager {
     }
   }
 
+  private void withTimeoutExecution(
+      Supplier supplier, long timeoutSec, String operationDetailedMsg) {
+    CompletableFuture<Void> future =
+        CompletableFuture.supplyAsync(supplier, 
Executors.newSingleThreadExecutor());
+    CompletableFuture extended =
+        CompletableFutureExtension.orTimeout(future, timeoutSec, 
TimeUnit.SECONDS);
+    try {
+      extended.get();
+    } catch (Exception e) {
+      if (e instanceof ExecutionException) {
+        if (e.getCause() instanceof TimeoutException) {
+          LOG.error(
+              "Errors on finishing operation of [{}] in the {}(sec). This 
should not happen!",
+              operationDetailedMsg,
+              timeoutSec);
+          return;
+        }
+        throw new RssException(e);
+      }
+    }
+  }
+
   public void refreshAppId(String appId) {
     shuffleTaskInfos
         .computeIfAbsent(
@@ -940,4 +989,10 @@ public class ShuffleTaskManager {
   public void start() {
     clearResourceThread.start();
   }
+
+  // only for tests
+  @VisibleForTesting
+  protected void setStorageManager(StorageManager storageManager) {
+    this.storageManager = storageManager;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 159f64f27..10f831ad1 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -290,7 +290,9 @@ public class LocalStorageManager extends 
SingleStorageManager {
                     StorageType.LOCALFILE.name(), new Configuration()));
 
     List<String> deletePaths =
-        storageBasePaths.stream()
+        localStorages.stream()
+            .filter(x -> !x.isCorrupted())
+            .map(x -> x.getBasePath())
             .flatMap(
                 path -> {
                   String basicPath = 
ShuffleStorageUtils.getFullShuffleDataFolder(path, appId);
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 75c49cd4d..c9cbdf49a 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -40,6 +40,7 @@ import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.io.TempDir;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
 
@@ -49,6 +50,7 @@ import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedBlock;
 import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.config.RssBaseConf;
 import org.apache.uniffle.common.exception.InvalidRequestException;
 import org.apache.uniffle.common.exception.NoBufferForHugePartitionException;
 import org.apache.uniffle.common.exception.NoRegisterException;
@@ -60,6 +62,7 @@ import org.apache.uniffle.common.util.RssUtils;
 import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
 import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.event.PurgeEvent;
 import org.apache.uniffle.server.storage.LocalStorageManager;
 import org.apache.uniffle.server.storage.StorageManager;
 import org.apache.uniffle.storage.HadoopTestBase;
@@ -68,6 +71,7 @@ import 
org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
 import org.apache.uniffle.storage.util.ShuffleStorageUtils;
 import org.apache.uniffle.storage.util.StorageType;
 
+import static org.apache.uniffle.common.StorageType.MEMORY_LOCALFILE;
 import static 
org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION;
 import static 
org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -77,6 +81,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.spy;
 
 public class ShuffleTaskManagerTest extends HadoopTestBase {
 
@@ -1154,6 +1161,54 @@ public class ShuffleTaskManagerTest extends 
HadoopTestBase {
     assertEquals(30, ShuffleTaskManager.getMaxConcurrencyWriting(40, conf));
   }
 
+  @Timeout(10)
+  @Test
+  public void testStorageRemoveResourceHang(@TempDir File tmpDir) throws 
Exception {
+    String confFile = ClassLoader.getSystemResource("server.conf").getFile();
+    ShuffleServerConf conf = new ShuffleServerConf(confFile);
+    final String storageBasePath = tmpDir.getAbsolutePath() + 
"rss/testStorageRemoveResourceHang";
+    conf.set(RssBaseConf.RSS_STORAGE_TYPE, MEMORY_LOCALFILE);
+    conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
+    conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+    conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+    conf.set(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC, 
2L);
+
+    shuffleServer = new ShuffleServer(conf);
+    ShuffleTaskManager shuffleTaskManager = 
shuffleServer.getShuffleTaskManager();
+
+    String appId = "appId1";
+    shuffleTaskManager.registerShuffle(
+        appId,
+        1,
+        Lists.newArrayList(new PartitionRange(0, 1)),
+        new RemoteStorageInfo(storageBasePath, Maps.newHashMap()),
+        StringUtils.EMPTY);
+    shuffleTaskManager.refreshAppId(appId);
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+    ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35);
+    shuffleTaskManager.requireBuffer(35);
+    shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0);
+    shuffleTaskManager.updateCachedBlockIds(appId, 0, 
partitionedData0.getBlockList());
+    shuffleTaskManager.refreshAppId(appId);
+    shuffleTaskManager.checkResourceStatus();
+    assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+    // get the underlying localfile storage manager to simulate hang
+    LocalStorageManager storageManager = (LocalStorageManager) 
shuffleServer.getStorageManager();
+    storageManager = spy(storageManager);
+    doAnswer(
+            x -> {
+              Thread.sleep(100000);
+              return null;
+            })
+        .when(storageManager)
+        .removeResources(any(PurgeEvent.class));
+    shuffleTaskManager.setStorageManager(storageManager);
+
+    shuffleTaskManager.removeResources(appId, false);
+  }
+
   @Test
   public void testRegisterShuffleAfterAppIsExpired() throws Exception {
     String confFile = ClassLoader.getSystemResource("server.conf").getFile();

Reply via email to