This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 2f347338 [ISSUE-228][FEATURE] Add a period local storage cleaner
thread (#357)
2f347338 is described below
commit 2f3473381d5524aa15696eab3b1623dcb5ddf2b6
Author: sfwang218 <[email protected]>
AuthorDate: Fri Nov 25 21:24:52 2022 +0800
[ISSUE-228][FEATURE] Add a period local storage cleaner thread (#357)
### What changes were proposed in this pull request?
Add a period local storage cleaner thread to check the leak shuffle data on
disks
### Why are the changes needed?
Because we don't use strict lock strategy, we will delete some shuffle data
normally. If we use strict lock strategy, the performance will be bad. So we
want to add a period local storage cleaner thread. The thread examine which
application is in the disk and judge whether the app exists the memory data
structure ShuffleTaskManager#shuffleTaskInfos. If memory data structure don't
exist, we will delete the data in the disk, with issue #282
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
tests were added
Co-authored-by: wangshifa <[email protected]>
---
docs/server_guide.md | 1 +
.../apache/uniffle/server/ShuffleServerConf.java | 6 ++
.../apache/uniffle/server/ShuffleTaskManager.java | 19 +++++
.../uniffle/server/storage/HdfsStorageManager.java | 5 ++
.../server/storage/LocalStorageManager.java | 27 +++++++
.../server/storage/MultiStorageManager.java | 6 ++
.../uniffle/server/storage/StorageManager.java | 5 +-
.../uniffle/server/ShuffleTaskManagerTest.java | 89 ++++++++++++++++++++++
.../uniffle/storage/common/LocalStorage.java | 15 ++++
9 files changed, 172 insertions(+), 1 deletion(-)
diff --git a/docs/server_guide.md b/docs/server_guide.md
index efdf767a..871aa237 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -83,3 +83,4 @@ This document will introduce how to deploy Uniffle shuffle
servers.
|rss.server.single.buffer.flush.threshold|64M|The threshold of single shuffle
buffer flush|
|rss.server.disk.capacity|-1|Disk capacity that shuffle server can use. If
it's negative, it will use the default disk whole space|
|rss.server.multistorage.fallback.strategy.class|-|The fallback strategy for
`MEMORY_LOCALFILE_HDFS`. Support
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
and `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`. If
not set, `org.apache.uniffle.server.storage.HdfsStorageManagerFallbackStrategy`
will be used.|
+|rss.server.leak.shuffledata.check.interval|3600000|The interval of leak
shuffle data check (ms)|
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index b8bdf7ac..52acc549 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -308,6 +308,12 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(60 * 1000L)
.withDescription("The timeout of the cache which record the mapping
information");
+ public static final ConfigOption<Long>
SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL = ConfigOptions
+ .key("rss.server.leak.shuffledata.check.interval")
+ .longType()
+ .defaultValue(3600 * 1000L)
+ .withDescription("the interval of leak shuffle data check");
+
public ShuffleServerConf() {
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index cea1dcb6..6fe1e412 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -70,12 +70,14 @@ public class ShuffleTaskManager {
private final ShuffleFlushManager shuffleFlushManager;
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService expiredAppCleanupExecutorService;
+ private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
private final StorageManager storageManager;
private AtomicLong requireBufferId = new AtomicLong(0);
private ShuffleServerConf conf;
private long appExpiredWithoutHB;
private long preAllocationExpired;
private long commitCheckIntervalMax;
+ private long leakShuffleDataCheckInterval;
// appId -> shuffleId -> blockIds to avoid too many appId
// store taskAttemptId info to filter speculation task
// Roaring64NavigableMap instance will cost much memory,
@@ -102,6 +104,7 @@ public class ShuffleTaskManager {
this.appExpiredWithoutHB =
conf.getLong(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT);
this.commitCheckIntervalMax =
conf.getLong(ShuffleServerConf.SERVER_COMMIT_CHECK_INTERVAL_MAX);
this.preAllocationExpired =
conf.getLong(ShuffleServerConf.SERVER_PRE_ALLOCATION_EXPIRED);
+ this.leakShuffleDataCheckInterval =
conf.getLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL);
// the thread for checking application status
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.getThreadFactory("checkResource-%d"));
@@ -113,6 +116,11 @@ public class ShuffleTaskManager {
expiredAppCleanupExecutorService.scheduleAtFixedRate(
() -> checkResourceStatus(), appExpiredWithoutHB / 2,
appExpiredWithoutHB / 2, TimeUnit.MILLISECONDS);
+ this.leakShuffleDataCheckExecutorService =
Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("leakShuffleDataChecker"));
+ leakShuffleDataCheckExecutorService.scheduleAtFixedRate(
+ () -> checkLeakShuffleData(), leakShuffleDataCheckInterval,
+ leakShuffleDataCheckInterval, TimeUnit.MILLISECONDS);
// the thread for clear expired resources
clearResourceThread = () -> {
while (true) {
@@ -452,6 +460,17 @@ public class ShuffleTaskManager {
appId, shuffleIds, System.currentTimeMillis() - start);
}
+ public void checkLeakShuffleData() {
+ LOG.info("Start check leak shuffle data");
+ try {
+ Set<String> appIds = Sets.newHashSet(shuffleTaskInfos.keySet());
+ storageManager.checkAndClearLeakShuffleData(appIds);
+ LOG.info("Finish check leak shuffle data");
+ } catch (Exception e) {
+ LOG.warn("Error happened in checkLeakShuffleData", e);
+ }
+ }
+
@VisibleForTesting
public void removeResources(String appId) {
LOG.info("Start remove resource for appId[" + appId + "]");
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
index 3087952e..2151e75d 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HdfsStorageManager.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.server.storage;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -134,6 +135,10 @@ public class HdfsStorageManager extends
SingleStorageManager {
appIdToStorages.putIfAbsent(appId, pathToStorages.get(remoteStorage));
}
+ @Override
+ public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+ }
+
public HdfsStorage getStorageByAppId(String appId) {
if (!appIdToStorages.containsKey(appId)) {
synchronized (this) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 4ba3fd3b..db0f47e3 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -19,7 +19,9 @@ package org.apache.uniffle.server.storage;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
@@ -62,6 +64,7 @@ import static
org.apache.uniffle.server.ShuffleServerConf.LOCAL_STORAGE_INITIALI
public class LocalStorageManager extends SingleStorageManager {
private static final Logger LOG =
LoggerFactory.getLogger(LocalStorageManager.class);
+ private static final String UNKNOWN_USER_NAME = "unknown";
private final List<LocalStorage> localStorages;
private final List<String> storageBasePaths;
@@ -218,6 +221,30 @@ public class LocalStorageManager extends
SingleStorageManager {
// ignore
}
+ @Override
+ public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+ Set<String> appIdsOnStorages = new HashSet<>();
+ for (LocalStorage localStorage : localStorages) {
+ if (!localStorage.isCorrupted()) {
+ Set<String> appIdsOnStorage = localStorage.getAppIds();
+ appIdsOnStorages.addAll(appIdsOnStorage);
+ }
+ }
+
+ for (String appId : appIdsOnStorages) {
+ if (!appIds.contains(appId)) {
+ ShuffleDeleteHandler deleteHandler =
ShuffleHandlerFactory.getInstance()
+ .createShuffleDeleteHandler(
+ new
CreateShuffleDeleteHandlerRequest(StorageType.LOCALFILE.name(), new
Configuration()));
+ String[] deletePaths = new String[storageBasePaths.size()];
+ for (int i = 0; i < storageBasePaths.size(); i++) {
+ deletePaths[i] =
ShuffleStorageUtils.getFullShuffleDataFolder(storageBasePaths.get(i), appId);
+ }
+ deleteHandler.delete(deletePaths, appId, UNKNOWN_USER_NAME);
+ }
+ }
+ }
+
void repair() {
boolean hasNewCorruptedStorage = false;
for (LocalStorage storage : localStorages) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
index 942ae5bb..adb84369 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/MultiStorageManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server.storage;
import java.io.IOException;
import java.lang.reflect.Constructor;
+import java.util.Collection;
import java.util.concurrent.TimeUnit;
import com.google.common.cache.Cache;
@@ -159,6 +160,11 @@ public class MultiStorageManager implements StorageManager
{
return warmStorageManager.canWrite(event) ||
coldStorageManager.canWrite(event);
}
+ @Override
+ public void checkAndClearLeakShuffleData(Collection<String> appIds) {
+ warmStorageManager.checkAndClearLeakShuffleData(appIds);
+ }
+
public void removeResources(PurgeEvent event) {
LOG.info("Start to remove resource of {}", event);
warmStorageManager.removeResources(event);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
index a46bea29..2ba7b4c5 100644
--- a/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/storage/StorageManager.java
@@ -17,6 +17,8 @@
package org.apache.uniffle.server.storage;
+import java.util.Collection;
+
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.server.Checker;
import org.apache.uniffle.server.ShuffleDataFlushEvent;
@@ -25,7 +27,6 @@ import org.apache.uniffle.server.event.PurgeEvent;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;
-
public interface StorageManager {
Storage selectStorage(ShuffleDataFlushEvent event);
@@ -51,4 +52,6 @@ public interface StorageManager {
boolean canWrite(ShuffleDataFlushEvent event);
// todo: add an interface that check storage isHealthy
+
+ void checkAndClearLeakShuffleData(Collection<String> appIds);
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 99b79327..90b67a94 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server;
import java.io.File;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
@@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
import org.apache.uniffle.common.BufferSegment;
@@ -49,8 +51,10 @@ import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.server.buffer.PreAllocatedBufferInfo;
import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
+import org.apache.uniffle.server.storage.LocalStorageManager;
import org.apache.uniffle.server.storage.StorageManager;
import org.apache.uniffle.storage.HdfsTestBase;
+import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.handler.impl.HdfsClientReadHandler;
import org.apache.uniffle.storage.util.ShuffleStorageUtils;
import org.apache.uniffle.storage.util.StorageType;
@@ -633,6 +637,91 @@ public class ShuffleTaskManagerTest extends HdfsTestBase {
}
}
+ @Test
+ public void checkAndClearLeakShuffleDataTest(@TempDir File tempDir) throws
Exception {
+ final String appId = "clearLocalTest_appId";
+
+ ShuffleServerConf conf = new ShuffleServerConf();
+ final int shuffleId = 1;
+ conf.set(ShuffleServerConf.RPC_SERVER_PORT, 1234);
+ conf.set(ShuffleServerConf.RSS_COORDINATOR_QUORUM, "localhost:9527");
+ conf.set(ShuffleServerConf.JETTY_HTTP_PORT, 12345);
+ conf.set(ShuffleServerConf.JETTY_CORE_POOL_SIZE, 64);
+ conf.set(ShuffleServerConf.SERVER_BUFFER_CAPACITY, 64L);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_HIGHWATERMARK_PERCENTAGE,
50.0);
+ conf.set(ShuffleServerConf.SERVER_MEMORY_SHUFFLE_LOWWATERMARK_PERCENTAGE,
0.0);
+ conf.set(ShuffleServerConf.SERVER_COMMIT_TIMEOUT, 10000L);
+ conf.set(ShuffleServerConf.SERVER_APP_EXPIRED_WITHOUT_HEARTBEAT, 2000L);
+ conf.set(ShuffleServerConf.HEALTH_CHECK_ENABLE, false);
+ conf.set(ShuffleServerConf.RSS_STORAGE_BASE_PATH,
Arrays.asList(tempDir.getAbsolutePath()));
+ conf.setString(ShuffleServerConf.RSS_STORAGE_TYPE,
StorageType.LOCALFILE.name());
+ conf.setLong(ShuffleServerConf.DISK_CAPACITY, 1024L * 1024L * 1024L);
+ // make sure not to check leak shuffle data automatically
+ conf.setLong(ShuffleServerConf.SERVER_LEAK_SHUFFLE_DATA_CHECK_INTERVAL,
600 * 1000L);
+
+ ShuffleServer shuffleServer = new ShuffleServer(conf);
+ ShuffleTaskManager shuffleTaskManager =
shuffleServer.getShuffleTaskManager();
+ shuffleTaskManager.registerShuffle(
+ appId,
+ shuffleId,
+ Lists.newArrayList(new PartitionRange(0, 1)),
+ RemoteStorageInfo.EMPTY_REMOTE_STORAGE,
+ StringUtils.EMPTY
+ );
+
+ shuffleTaskManager.refreshAppId(appId);
+ assertEquals(1, shuffleTaskManager.getAppIds().size());
+
+ ShufflePartitionedData shuffleData = createPartitionedData(1, 1, 48);
+
+ // make sure shuffle data flush to disk
+ int retry = 0;
+ while (retry < 5) {
+ Thread.sleep(1000);
+ shuffleTaskManager.cacheShuffleData(appId, shuffleId, false,
shuffleData);
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId,
shuffleData.getBlockList());
+ shuffleTaskManager.refreshAppId(appId);
+ shuffleTaskManager.checkResourceStatus();
+
+ retry++;
+ }
+
+ StorageManager storageManager = shuffleServer.getStorageManager();
+ assertTrue(storageManager instanceof LocalStorageManager);
+ LocalStorageManager localStorageManager = (LocalStorageManager)
storageManager;
+ // parse appIds from storage
+ Set<String> appIdsOnDisk = getAppIdsOnDisk(localStorageManager);
+ assertEquals(appIdsOnDisk.size(), shuffleTaskManager.getAppIds().size());
+ assertTrue(appIdsOnDisk.contains(appId));
+
+ // make sure heartbeat timeout and resources are removed
+ Thread.sleep(5000);
+
+ appIdsOnDisk = getAppIdsOnDisk(localStorageManager);
+ assertFalse(appIdsOnDisk.contains(appId));
+
+ // mock leak shuffle data
+ File file = new File(tempDir, appId);
+ assertFalse(file.exists());
+ file.mkdir();
+ assertTrue(file.exists());
+
+ // execute checkLeakShuffleData
+ shuffleTaskManager.checkLeakShuffleData();
+ assertFalse(file.exists());
+ }
+
+ private Set<String> getAppIdsOnDisk(LocalStorageManager localStorageManager)
{
+ Set<String> appIdsOnDisk = new HashSet<>();
+
+ List<LocalStorage> storages = localStorageManager.getStorages();
+ for (LocalStorage storage : storages) {
+ appIdsOnDisk.addAll(storage.getAppIds());
+ }
+
+ return appIdsOnDisk;
+ }
+
// copy from ClientUtils
private Long getBlockId(long partitionId, long taskAttemptId, long
atomicInt) {
return (atomicInt << (Constants.PARTITION_ID_MAX_LENGTH +
Constants.TASK_ATTEMPT_ID_MAX_LENGTH))
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
index 19c886ee..4032f3b1 100644
--- a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
+++ b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorage.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.storage.common;
import java.io.File;
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
@@ -316,6 +317,20 @@ public class LocalStorage extends AbstractStorage {
isCorrupted = true;
}
+ public Set<String> getAppIds() {
+ Set<String> appIds = new HashSet<>();
+ File baseFolder = new File(basePath);
+ File[] files = baseFolder.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ if (file.isDirectory()) {
+ appIds.add(file.getName());
+ }
+ }
+ }
+ return appIds;
+ }
+
public static class Builder {
private long capacity;
private double lowWaterMarkOfWrite;