This is an automated email from the ASF dual-hosted git repository. rickyma pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new e500dc597 [#1956] fix(server): Prevent file deletion hang in case of disk corruption (#1958) e500dc597 is described below commit e500dc5973e51723504909c2e48fd2e945e71f5e Author: Junfan Zhang <zus...@apache.org> AuthorDate: Fri Jul 26 16:20:19 2024 +0800 [#1956] fix(server): Prevent file deletion hang in case of disk corruption (#1958) ### What changes were proposed in this pull request? Prevent file deletion hang in case of disk corruption. ### Why are the changes needed? Fix: #1956 ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Unit tests --- .../apache/uniffle/server/ShuffleServerConf.java | 6 ++ .../apache/uniffle/server/ShuffleTaskManager.java | 73 +++++++++++++++++++--- .../server/storage/LocalStorageManager.java | 4 +- .../uniffle/server/ShuffleTaskManagerTest.java | 55 ++++++++++++++++ 4 files changed, 128 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index e504cd2e3..92e5a46fa 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -636,6 +636,12 @@ public class ShuffleServerConf extends RssBaseConf { "A comma-separated block size list, where each value" + " can be suffixed with a memory size unit, such as kb or k, mb or m, etc."); + public static final ConfigOption<Long> STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC = + ConfigOptions.key("rss.server.storage.resourceRemoveOperationTimeoutSec") + .longType() + .defaultValue(10 * 60L) + .withDescription("The storage remove resource operation timeout."); + public ShuffleServerConf() {} public ShuffleServerConf(String fileName) { diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java index b258c8a11..f90ec7c8a 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java @@ -26,13 +26,17 @@ import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.function.Supplier; import com.google.common.annotations.VisibleForTesting; import com.google.common.cache.Cache; @@ -61,6 +65,7 @@ import org.apache.uniffle.common.exception.NoBufferException; import org.apache.uniffle.common.exception.NoBufferForHugePartitionException; import org.apache.uniffle.common.exception.NoRegisterException; import org.apache.uniffle.common.exception.RssException; +import org.apache.uniffle.common.future.CompletableFutureExtension; import org.apache.uniffle.common.rpc.StatusCode; import org.apache.uniffle.common.util.BlockIdLayout; import org.apache.uniffle.common.util.Constants; @@ -92,7 +97,7 @@ public class ShuffleTaskManager { private final ScheduledExecutorService leakShuffleDataCheckExecutorService; private ScheduledExecutorService triggerFlushExecutorService; private final TopNShuffleDataSizeOfAppCalcTask topNShuffleDataSizeOfAppCalcTask; - private final StorageManager storageManager; + private StorageManager storageManager; private AtomicLong requireBufferId = new AtomicLong(0); private ShuffleServerConf conf; private long appExpiredWithoutHB; @@ -113,6 +118,7 @@ public class ShuffleTaskManager { private Thread clearResourceThread; private BlockingQueue<PurgeEvent> expiredAppIdQueue = Queues.newLinkedBlockingQueue(); private final Cache<String, ReentrantReadWriteLock> appLocks; + private final long storageRemoveOperationTimeoutSec; public ShuffleTaskManager( ShuffleServerConf conf, @@ -127,6 +133,8 @@ public class ShuffleTaskManager { this.appExpiredWithoutHB = conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT); this.commitCheckIntervalMax = conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX); this.preAllocationExpired = conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED); + this.storageRemoveOperationTimeoutSec = + conf.getLong(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC); this.leakShuffleDataCheckInterval = conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL); this.triggerFlushInterval = conf.getLong(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL); @@ -766,8 +774,18 @@ public class ShuffleTaskManager { }); shuffleBufferManager.removeBufferByShuffleId(appId, shuffleIds); shuffleFlushManager.removeResourcesOfShuffleId(appId, shuffleIds); - storageManager.removeResources( - new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)); + + String operationMsg = + String.format("removing storage data for appId:%s, shuffleIds:%s", appId, shuffleIds); + withTimeoutExecution( + () -> { + storageManager.removeResources( + new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds)); + return null; + }, + storageRemoveOperationTimeoutSec, + operationMsg); + LOG.info( "Finish remove resource for appId[{}], shuffleIds[{}], cost[{}]", appId, @@ -811,12 +829,21 @@ public class ShuffleTaskManager { partitionsToBlockIds.remove(appId); shuffleBufferManager.removeBuffer(appId); shuffleFlushManager.removeResources(appId); - storageManager.removeResources( - new AppPurgeEvent( - appId, - shuffleTaskInfo.getUser(), - new ArrayList<>(shuffleTaskInfo.getShuffleIds()), - checkAppExpired)); + + String operationMsg = String.format("removing storage data for appId:%s", appId); + withTimeoutExecution( + () -> { + storageManager.removeResources( + new AppPurgeEvent( + appId, + shuffleTaskInfo.getUser(), + new ArrayList<>(shuffleTaskInfo.getShuffleIds()), + checkAppExpired)); + return null; + }, + storageRemoveOperationTimeoutSec, + operationMsg); + if (shuffleTaskInfo.hasHugePartition()) { ShuffleServerMetrics.gaugeAppWithHugePartitionNum.dec(); ShuffleServerMetrics.gaugeHugePartitionNum.dec(); @@ -832,6 +859,28 @@ public class ShuffleTaskManager { } } + private void withTimeoutExecution( + Supplier supplier, long timeoutSec, String operationDetailedMsg) { + CompletableFuture<Void> future = + CompletableFuture.supplyAsync(supplier, Executors.newSingleThreadExecutor()); + CompletableFuture extended = + CompletableFutureExtension.orTimeout(future, timeoutSec, TimeUnit.SECONDS); + try { + extended.get(); + } catch (Exception e) { + if (e instanceof ExecutionException) { + if (e.getCause() instanceof TimeoutException) { + LOG.error( + "Errors on finishing operation of [{}] in the {}(sec). This should not happen!", + operationDetailedMsg, + timeoutSec); + return; + } + throw new RssException(e); + } + } + } + public void refreshAppId(String appId) { shuffleTaskInfos .computeIfAbsent( @@ -940,4 +989,10 @@ public class ShuffleTaskManager { public void start() { clearResourceThread.start(); } + + // only for tests + @VisibleForTesting + protected void setStorageManager(StorageManager storageManager) { + this.storageManager = storageManager; + } } diff --git a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java index 159f64f27..10f831ad1 100644 --- a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java +++ b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java @@ -290,7 +290,9 @@ public class LocalStorageManager extends SingleStorageManager { StorageType.LOCALFILE.name(), new Configuration())); List<String> deletePaths = - storageBasePaths.stream() + localStorages.stream() + .filter(x -> !x.isCorrupted()) + .map(x -> x.getBasePath()) .flatMap( path -> { String basicPath = ShuffleStorageUtils.getFullShuffleDataFolder(path, appId); diff --git a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java index 75c49cd4d..c9cbdf49a 100644 --- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java +++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java @@ -40,6 +40,7 @@ import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.api.io.TempDir; import org.roaringbitmap.longlong.Roaring64NavigableMap; @@ -49,6 +50,7 @@ import org.apache.uniffle.common.RemoteStorageInfo; import org.apache.uniffle.common.ShuffleDataResult; import org.apache.uniffle.common.ShufflePartitionedBlock; import org.apache.uniffle.common.ShufflePartitionedData; +import org.apache.uniffle.common.config.RssBaseConf; import org.apache.uniffle.common.exception.InvalidRequestException; import org.apache.uniffle.common.exception.NoBufferForHugePartitionException; import org.apache.uniffle.common.exception.NoRegisterException; @@ -60,6 +62,7 @@ import org.apache.uniffle.common.util.RssUtils; import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo; import org.apache.uniffle.server.buffer.ShuffleBuffer; import org.apache.uniffle.server.buffer.ShuffleBufferManager; +import org.apache.uniffle.server.event.PurgeEvent; import org.apache.uniffle.server.storage.LocalStorageManager; import org.apache.uniffle.server.storage.StorageManager; import org.apache.uniffle.storage.HadoopTestBase; @@ -68,6 +71,7 @@ import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler; import org.apache.uniffle.storage.util.ShuffleStorageUtils; import org.apache.uniffle.storage.util.StorageType; +import static org.apache.uniffle.common.StorageType.MEMORY_LOCALFILE; import static org.apache.uniffle.server.ShuffleServerConf.CLIENT_MAX_CONCURRENCY_LIMITATION_OF_ONE_PARTITION; import static org.apache.uniffle.server.ShuffleServerConf.SERVER_MAX_CONCURRENCY_OF_ONE_PARTITION; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -77,6 +81,9 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.spy; public class ShuffleTaskManagerTest extends HadoopTestBase { @@ -1154,6 +1161,54 @@ public class ShuffleTaskManagerTest extends HadoopTestBase { assertEquals(30, ShuffleTaskManager.getMaxConcurrencyWriting(40, conf)); } + @Timeout(10) + @Test + public void testStorageRemoveResourceHang(@TempDir File tmpDir) throws Exception { + String confFile = ClassLoader.getSystemResource("server.conf").getFile(); + ShuffleServerConf conf = new ShuffleServerConf(confFile); + final String storageBasePath = tmpDir.getAbsolutePath() + "rss/testStorageRemoveResourceHang"; + conf.set(RssBaseConf.RSS_STORAGE_TYPE, MEMORY_LOCALFILE); + conf.set(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true); + conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234); + conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527"); + conf.set(ShuffleServerConf.STORAGE_REMOVE_RESOURCE_OPERATION_TIMEOUT_SEC, 2L); + + shuffleServer = new ShuffleServer(conf); + ShuffleTaskManager shuffleTaskManager = shuffleServer.getShuffleTaskManager(); + + String appId = "appId1"; + shuffleTaskManager.registerShuffle( + appId, + 1, + Lists.newArrayList(new PartitionRange(0, 1)), + new RemoteStorageInfo(storageBasePath, Maps.newHashMap()), + StringUtils.EMPTY); + shuffleTaskManager.refreshAppId(appId); + assertEquals(1, shuffleTaskManager.getAppIds().size()); + + ShufflePartitionedData partitionedData0 = createPartitionedData(1, 1, 35); + shuffleTaskManager.requireBuffer(35); + shuffleTaskManager.cacheShuffleData(appId, 0, false, partitionedData0); + shuffleTaskManager.updateCachedBlockIds(appId, 0, partitionedData0.getBlockList()); + shuffleTaskManager.refreshAppId(appId); + shuffleTaskManager.checkResourceStatus(); + assertEquals(1, shuffleTaskManager.getAppIds().size()); + + // get the underlying localfile storage manager to simulate hang + LocalStorageManager storageManager = (LocalStorageManager) shuffleServer.getStorageManager(); + storageManager = spy(storageManager); + doAnswer( + x -> { + Thread.sleep(100000); + return null; + }) + .when(storageManager) + .removeResources(any(PurgeEvent.class)); + shuffleTaskManager.setStorageManager(storageManager); + + shuffleTaskManager.removeResources(appId, false); + } + @Test public void testRegisterShuffleAfterAppIsExpired() throws Exception { String confFile = ClassLoader.getSystemResource("server.conf").getFile();