This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new af342e6e8 [#2083] improvement: Quickly delete local or HDFS data at
the shuffleId level. (#2084)
af342e6e8 is described below
commit af342e6e8687df509a10f15712a5c4337b4f4894
Author: yl09099 <[email protected]>
AuthorDate: Mon Feb 24 14:14:58 2025 +0800
[#2083] improvement: Quickly delete local or HDFS data at the shuffleId
level. (#2084)
### What changes were proposed in this pull request?
At the shuffleId level, data on the local or HDFS needs to be deleted
synchronously. In some scenarios, the deletion time needs to be shortened. You
can rename folders and delete them asynchronously.
### Why are the changes needed?
Fix: #2083
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UT.
---
.../uniffle/server/ShuffleServerGrpcService.java | 4 +-
.../uniffle/server/ShuffleServerMetrics.java | 6 ++
.../apache/uniffle/server/ShuffleTaskManager.java | 11 ++-
.../apache/uniffle/server/event/PurgeEvent.java | 14 +++
.../uniffle/server/event/ShufflePurgeEvent.java | 7 +-
.../server/storage/HadoopStorageManager.java | 3 +-
.../server/storage/LocalStorageManager.java | 8 +-
.../storage/factory/ShuffleHandlerFactory.java | 7 +-
.../uniffle/storage/handler/AsynDeletionEvent.java | 84 ++++++++++++++++
.../storage/handler/api/ShuffleDeleteHandler.java | 2 +-
.../handler/impl/AsynDeletionEventManager.java | 107 +++++++++++++++++++++
.../handler/impl/HadoopShuffleDeleteHandler.java | 5 +-
.../handler/impl/LocalFileAsyncDeleteHandler.java | 93 ++++++++++++++++++
.../handler/impl/LocalFileDeleteHandler.java | 4 +-
.../request/CreateShuffleDeleteHandlerRequest.java | 16 +++
15 files changed, 360 insertions(+), 11 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
index 356a58d84..411aeb4b3 100644
---
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
+++
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java
@@ -272,7 +272,9 @@ public class ShuffleServerGrpcService extends
ShuffleServerImplBase {
taskInfo.refreshLatestStageAttemptNumber(shuffleId,
stageAttemptNumber);
try {
long start = System.currentTimeMillis();
-
shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId);
+ shuffleServer
+ .getShuffleTaskManager()
+ .removeShuffleDataSyncRenameAndDelete(appId, shuffleId);
LOG.info(
"Deleted the previous stage attempt data due to stage
recomputing for app: {}, "
+ "shuffleId: {}, stageAttemptNumber: {}. It costs {}
ms",
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index b891ecc5e..9b6c548e1 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -170,6 +170,9 @@ public class ShuffleServerMetrics {
public static final String REPORTED_BLOCK_COUNT = "reported_block_count";
public static final String CACHED_BLOCK_COUNT = "cached_block_count";
+ private static final String TOTAL_LOCAL_RENAME_AND_DELETION_FAILED =
+ "total_local_rename_and_deletion_failed";
+
public static Counter.Child counterTotalAppNum;
public static Counter.Child counterTotalAppWithHugePartitionNum;
public static Counter.Child counterTotalPartitionNum;
@@ -245,6 +248,7 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeReadLocalDataFileBufferSize;
public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
public static Gauge.Child gaugeReadMemoryDataBufferSize;
+ public static Counter.Child counterLocalRenameAndDeletionFaileTd;
public static Gauge gaugeTotalDataSizeUsage;
public static Gauge gaugeInMemoryDataSizeUsage;
@@ -440,6 +444,8 @@ public class ShuffleServerMetrics {
counterTotalHugePartitionNum =
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
counterTotalHugePartitionExceedHardLimitNum =
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM);
+ counterLocalRenameAndDeletionFaileTd =
+
metricsManager.addLabeledCounter(TOTAL_LOCAL_RENAME_AND_DELETION_FAILED);
gaugeLocalStorageIsWritable =
metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE,
LOCAL_DISK_PATH_LABEL);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 87bf31bac..8586ab905 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -798,6 +798,11 @@ public class ShuffleTaskManager {
* @param shuffleIds
*/
public void removeResourcesByShuffleIds(String appId, List<Integer>
shuffleIds) {
+ removeResourcesByShuffleIds(appId, shuffleIds, false);
+ }
+
+ public void removeResourcesByShuffleIds(
+ String appId, List<Integer> shuffleIds, boolean isRenameAndDelete) {
Lock writeLock = getAppWriteLock(appId);
writeLock.lock();
try {
@@ -830,7 +835,7 @@ public class ShuffleTaskManager {
withTimeoutExecution(
() -> {
storageManager.removeResources(
- new ShufflePurgeEvent(appId, getUserByAppId(appId),
shuffleIds));
+ new ShufflePurgeEvent(appId, getUserByAppId(appId),
shuffleIds, isRenameAndDelete));
return null;
},
storageRemoveOperationTimeoutSec,
@@ -1037,6 +1042,10 @@ public class ShuffleTaskManager {
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
}
+ public void removeShuffleDataSyncRenameAndDelete(String appId, int
shuffleId) {
+ removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true);
+ }
+
public ShuffleDataDistributionType getDataDistributionType(String appId) {
return shuffleTaskInfos.get(appId).getDataDistType();
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
index eb555f7ed..f890d44d4 100644
--- a/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
+++ b/server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java
@@ -25,11 +25,19 @@ public abstract class PurgeEvent {
private String appId;
private String user;
private List<Integer> shuffleIds;
+ // Whether to enable the deletion mode: Rename files and then delete them
asynchronously.
+ private boolean isRenameAndDelete;
public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
+ this(appId, user, shuffleIds, false);
+ }
+
+ public PurgeEvent(
+ String appId, String user, List<Integer> shuffleIds, boolean
isRenameAndDelete) {
this.appId = appId;
this.user = user;
this.shuffleIds = shuffleIds;
+ this.isRenameAndDelete = isRenameAndDelete;
}
public String getAppId() {
@@ -44,6 +52,10 @@ public abstract class PurgeEvent {
return shuffleIds;
}
+ public boolean isRenameAndDelete() {
+ return isRenameAndDelete;
+ }
+
@Override
public String toString() {
return this.getClass().getSimpleName()
@@ -56,6 +68,8 @@ public abstract class PurgeEvent {
+ '\''
+ ", shuffleIds="
+ shuffleIds
+ + ", isRenameAndDelete="
+ + isRenameAndDelete
+ '}';
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
index cbc39aab8..c7ea277c7 100644
---
a/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
+++
b/server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java
@@ -22,6 +22,11 @@ import java.util.List;
public class ShufflePurgeEvent extends PurgeEvent {
public ShufflePurgeEvent(String appId, String user, List<Integer>
shuffleIds) {
- super(appId, user, shuffleIds);
+ this(appId, user, shuffleIds, false);
+ }
+
+ public ShufflePurgeEvent(
+ String appId, String user, List<Integer> shuffleIds, boolean
isRenameAndDelete) {
+ super(appId, user, shuffleIds, isRenameAndDelete);
}
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
index adbbc594c..628004eed 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java
@@ -114,7 +114,8 @@ public class HadoopStorageManager extends
SingleStorageManager {
new CreateShuffleDeleteHandlerRequest(
StorageType.HDFS.name(),
storage.getConf(),
- purgeForExpired ? shuffleServerId : null));
+ purgeForExpired ? shuffleServerId : null,
+ event.isRenameAndDelete()));
String basicPath =
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
index 598a2a9c6..70cdf5487 100644
---
a/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java
@@ -312,7 +312,7 @@ public class LocalStorageManager extends
SingleStorageManager {
ShuffleHandlerFactory.getInstance()
.createShuffleDeleteHandler(
new CreateShuffleDeleteHandlerRequest(
- StorageType.LOCALFILE.name(), new Configuration()));
+ StorageType.LOCALFILE.name(), new Configuration(),
event.isRenameAndDelete()));
List<String> deletePaths =
localStorages.stream()
@@ -353,7 +353,11 @@ public class LocalStorageManager extends
SingleStorageManager {
})
.collect(Collectors.toList());
- deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]),
appId, user);
+ boolean isSuccess =
+ deleteHandler.delete(deletePaths.toArray(new
String[deletePaths.size()]), appId, user);
+ if (!isSuccess && event.isRenameAndDelete()) {
+ ShuffleServerMetrics.counterLocalRenameAndDeletionFaileTd.inc();
+ }
removeAppStorageInfo(event);
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
index b68d77a19..fba2d3810 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java
@@ -32,9 +32,11 @@ import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
+import org.apache.uniffle.storage.handler.impl.AsynDeletionEventManager;
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler;
+import org.apache.uniffle.storage.handler.impl.LocalFileAsyncDeleteHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler;
import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
@@ -189,7 +191,10 @@ public class ShuffleHandlerFactory {
CreateShuffleDeleteHandlerRequest request) {
if (StorageType.HDFS.name().equals(request.getStorageType())) {
return new HadoopShuffleDeleteHandler(request.getConf(),
request.getShuffleServerId());
- } else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) {
+ } else if (StorageType.LOCALFILE.name().equals(request.getStorageType())
&& request.isAsync()) {
+ return new
LocalFileAsyncDeleteHandler(AsynDeletionEventManager.getInstance());
+ } else if (StorageType.LOCALFILE.name().equals(request.getStorageType())
+ && !request.isAsync()) {
return new LocalFileDeleteHandler();
} else {
throw new UnsupportedOperationException(
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/AsynDeletionEvent.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/AsynDeletionEvent.java
new file mode 100644
index 000000000..b002c23db
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/AsynDeletionEvent.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+
+public class AsynDeletionEvent {
+ private static final String TEMPORARYSUFFIX = "_tmp";
+ private String appId;
+ private String user;
+ private String shuffleServerId;
+ private Configuration conf;
+ /** Records the mapping between the path to be deleted and the path to be
renamed. */
+ private Map<String, String> needDeletePathAndRenamePath;
+
+ private String storageType;
+
+ public AsynDeletionEvent(
+ String appId,
+ String user,
+ Configuration conf,
+ String shuffleServerId,
+ List<String> needDeletePath,
+ String storageType) {
+ this.appId = appId;
+ this.user = user;
+ this.shuffleServerId = shuffleServerId;
+ this.conf = conf;
+ this.needDeletePathAndRenamePath =
+ needDeletePath.stream()
+ .collect(
+ Collectors.toMap(Function.identity(), s -> StringUtils.join(s,
TEMPORARYSUFFIX)));
+ this.storageType = storageType;
+ }
+
+ public String getAppId() {
+ return appId;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public Map<String, String> getNeedDeletePathAndRenamePath() {
+ return needDeletePathAndRenamePath;
+ }
+
+ public String[] getNeedDeleteRenamePaths() {
+ return
needDeletePathAndRenamePath.values().stream().toArray(String[]::new);
+ }
+
+ public String getShuffleServerId() {
+ return shuffleServerId;
+ }
+
+ public String getStorageType() {
+ return storageType;
+ }
+}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
index c0b32b943..195a26dab 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java
@@ -24,5 +24,5 @@ public interface ShuffleDeleteHandler {
*
* @param appId ApplicationId for delete
*/
- void delete(String[] storageBasePaths, String appId, String user);
+ boolean delete(String[] storageBasePaths, String appId, String user);
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java
new file mode 100644
index 000000000..a4398e841
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/AsynDeletionEventManager.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.util.concurrent.BlockingQueue;
+
+import com.google.common.collect.Queues;
+import org.apache.hadoop.conf.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.storage.factory.ShuffleHandlerFactory;
+import org.apache.uniffle.storage.handler.AsynDeletionEvent;
+import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
+import org.apache.uniffle.storage.request.CreateShuffleDeleteHandlerRequest;
+import org.apache.uniffle.storage.util.StorageType;
+
+/**
+ * To quickly delete the Shuffle Data that has been dropped to the disk, you
need to rename the data
+ * first and then encapsulate the data into an asynchronous deletion event.
This function is used to
+ * manage the actual execution of the asynchronous deletion event.
+ */
+public class AsynDeletionEventManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(AsynDeletionEventManager.class);
+
+ private static AsynDeletionEventManager INSTANCE;
+
+ public static synchronized AsynDeletionEventManager getInstance() {
+ if (INSTANCE == null) {
+ INSTANCE = new AsynDeletionEventManager();
+ }
+ return INSTANCE;
+ }
+
+ protected final BlockingQueue<AsynDeletionEvent>
renameAndAsynDeleteEventQueue =
+ Queues.newLinkedBlockingQueue();
+ protected Thread renameAndAsynDeleteThread;
+
+ public AsynDeletionEventManager() {
+ Runnable renameAndDeletionTask =
+ () -> {
+ while (true) {
+ AsynDeletionEvent asynDeletionEvent = null;
+ try {
+ asynDeletionEvent = renameAndAsynDeleteEventQueue.take();
+ if (asynDeletionEvent
+ .getStorageType()
+ .equalsIgnoreCase(StorageType.LOCALFILE.name())) {
+ ShuffleDeleteHandler deleteHandler =
+ ShuffleHandlerFactory.getInstance()
+ .createShuffleDeleteHandler(
+ new CreateShuffleDeleteHandlerRequest(
+ StorageType.LOCALFILE.name(), new
Configuration()));
+ deleteHandler.delete(
+ asynDeletionEvent.getNeedDeleteRenamePaths(),
+ asynDeletionEvent.getAppId(),
+ asynDeletionEvent.getUser());
+ } else if (asynDeletionEvent
+ .getStorageType()
+ .equalsIgnoreCase(StorageType.HDFS.name())) {
+ ShuffleDeleteHandler deleteHandler =
+ ShuffleHandlerFactory.getInstance()
+ .createShuffleDeleteHandler(
+ new CreateShuffleDeleteHandlerRequest(
+ StorageType.HDFS.name(),
+ asynDeletionEvent.getConf(),
+ asynDeletionEvent.getShuffleServerId()));
+ deleteHandler.delete(
+ asynDeletionEvent.getNeedDeleteRenamePaths(),
+ asynDeletionEvent.getAppId(),
+ asynDeletionEvent.getUser());
+ }
+ } catch (Exception e) {
+ if (asynDeletionEvent != null) {
+ LOG.error(
+ "Delete Paths of {} failed.",
asynDeletionEvent.getNeedDeleteRenamePaths(), e);
+ } else {
+ LOG.error("Failed to delete a directory in
renameAndAsynDeleteThread.", e);
+ }
+ }
+ }
+ };
+ renameAndAsynDeleteThread = new Thread(renameAndDeletionTask);
+ renameAndAsynDeleteThread.setName("renameAndAsynDeleteThread");
+ renameAndAsynDeleteThread.setDaemon(true);
+ renameAndAsynDeleteThread.start();
+ }
+
+ public synchronized boolean handlerAsynDelete(AsynDeletionEvent
asynDeletionEvent) {
+ return renameAndAsynDeleteEventQueue.offer(asynDeletionEvent);
+ }
+}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
index 312f052f9..58854cb70 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/HadoopShuffleDeleteHandler.java
@@ -44,7 +44,7 @@ public class HadoopShuffleDeleteHandler implements
ShuffleDeleteHandler {
}
@Override
- public void delete(String[] storageBasePaths, String appId, String user) {
+ public boolean delete(String[] storageBasePaths, String appId, String user) {
for (String deletePath : storageBasePaths) {
final Path path = new Path(deletePath);
boolean isSuccess = false;
@@ -64,7 +64,7 @@ public class HadoopShuffleDeleteHandler implements
ShuffleDeleteHandler {
} catch (Exception e) {
if (e instanceof FileNotFoundException) {
LOG.info("[{}] doesn't exist, ignore it.", path);
- return;
+ return false;
}
times++;
LOG.warn(
@@ -96,6 +96,7 @@ public class HadoopShuffleDeleteHandler implements
ShuffleDeleteHandler {
+ " ms");
}
}
+ return true;
}
private void delete(FileSystem fileSystem, Path path, String filePrefix)
throws IOException {
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java
new file mode 100644
index 000000000..cdec039f4
--- /dev/null
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileAsyncDeleteHandler.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.storage.handler.impl;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.storage.handler.AsynDeletionEvent;
+import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
+import org.apache.uniffle.storage.util.StorageType;
+
+public class LocalFileAsyncDeleteHandler implements ShuffleDeleteHandler {
+ private static final Logger LOG =
LoggerFactory.getLogger(LocalFileAsyncDeleteHandler.class);
+ private AsynDeletionEventManager asynDeletionEventManager;
+
+ public LocalFileAsyncDeleteHandler(AsynDeletionEventManager
asynDeletionEventManager) {
+ this.asynDeletionEventManager = asynDeletionEventManager;
+ }
+
+ /** Rename the file and then delete it asynchronously. */
+ @Override
+ public boolean delete(String[] storageBasePaths, String appId, String user) {
+ AsynDeletionEvent asynDeletionEvent =
+ new AsynDeletionEvent(
+ appId, user, null, null, Arrays.asList(storageBasePaths),
StorageType.LOCALFILE.name());
+ for (Map.Entry<String, String> appIdNeedDeletePaths :
+ asynDeletionEvent.getNeedDeletePathAndRenamePath().entrySet()) {
+ String shufflePath = appIdNeedDeletePaths.getKey();
+ String breakdownShufflePath = appIdNeedDeletePaths.getValue();
+ boolean isExists;
+ boolean isSuccess = false;
+ long start = System.currentTimeMillis();
+ try {
+ File baseFolder = new File(shufflePath);
+ isExists = baseFolder.exists();
+ File breakdownBaseFolder = new File(breakdownShufflePath);
+ if (isExists) {
+ isSuccess = baseFolder.renameTo(breakdownBaseFolder);
+ }
+ if (isExists) {
+ if (isSuccess) {
+ LOG.info(
+ "Rename shuffle data for appId[{}] with {} to {} cost {} ms",
+ appId,
+ shufflePath,
+ breakdownShufflePath,
+ (System.currentTimeMillis() - start));
+ } else {
+ LOG.warn(
+ "Can't Rename shuffle data for appId[{}] with {} to {}",
+ appId,
+ shufflePath,
+ breakdownShufflePath);
+ }
+ } else {
+ LOG.info("Rename shuffle data for appId[{}],[{}] is not exists",
appId, shufflePath);
+ }
+ } catch (Exception e) {
+ LOG.error(
+ "Can't Rename shuffle data for appId[{}] with {} to {}",
+ appId,
+ shufflePath,
+ breakdownShufflePath,
+ e);
+ }
+ }
+ if (!asynDeletionEventManager.handlerAsynDelete(asynDeletionEvent)) {
+ LOG.warn(
+ "Remove the case where the renameAndDeletionEventQueue queue is full
and cannot accept elements.");
+ return false;
+ }
+ return true;
+ }
+}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
index 277fb18d7..106b01d77 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/handler/impl/LocalFileDeleteHandler.java
@@ -30,7 +30,7 @@ public class LocalFileDeleteHandler implements
ShuffleDeleteHandler {
private static final Logger LOG =
LoggerFactory.getLogger(LocalFileDeleteHandler.class);
@Override
- public void delete(String[] shuffleDataStoredPath, String appId, String
user) {
+ public boolean delete(String[] shuffleDataStoredPath, String appId, String
user) {
for (String basePath : shuffleDataStoredPath) {
final String shufflePath = basePath;
long start = System.currentTimeMillis();
@@ -47,7 +47,9 @@ public class LocalFileDeleteHandler implements
ShuffleDeleteHandler {
+ " ms");
} catch (Exception e) {
LOG.warn("Can't delete shuffle data for appId[" + appId + "] with " +
shufflePath, e);
+ return false;
}
}
+ return true;
}
}
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
index b8eda2895..250e1d203 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/request/CreateShuffleDeleteHandlerRequest.java
@@ -24,16 +24,28 @@ public class CreateShuffleDeleteHandlerRequest {
private String storageType;
private Configuration conf;
private String shuffleServerId;
+ private boolean isAsync;
public CreateShuffleDeleteHandlerRequest(String storageType, Configuration
conf) {
this(storageType, conf, null);
}
+ public CreateShuffleDeleteHandlerRequest(
+ String storageType, Configuration conf, boolean isAsync) {
+ this(storageType, conf, null, isAsync);
+ }
+
public CreateShuffleDeleteHandlerRequest(
String storageType, Configuration conf, String shuffleServerId) {
+ this(storageType, conf, shuffleServerId, false);
+ }
+
+ public CreateShuffleDeleteHandlerRequest(
+ String storageType, Configuration conf, String shuffleServerId, boolean
isAsync) {
this.storageType = storageType;
this.conf = conf;
this.shuffleServerId = shuffleServerId;
+ this.isAsync = isAsync;
}
public String getStorageType() {
@@ -47,4 +59,8 @@ public class CreateShuffleDeleteHandlerRequest {
public String getShuffleServerId() {
return shuffleServerId;
}
+
+ public boolean isAsync() {
+ return isAsync;
+ }
}