This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 95dccbda6 [#2454] improvement(server): Remove lock in
ShuffleTaskManager#registerShuffle (#2456)
95dccbda6 is described below
commit 95dccbda6920ef761a015578e79de54eabd46469
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Apr 30 09:53:24 2025 +0800
[#2454] improvement(server): Remove lock in
ShuffleTaskManager#registerShuffle (#2456)
### What changes were proposed in this pull request?
Remove the lock in ShuffleTaskManager#registerShuffle
App with heartbeat timeout is not allowed to register again
### Why are the changes needed?
For better performance
Fix: #2454
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
CI
---
.../apache/uniffle/server/ShuffleTaskManager.java | 57 ++++++++++++----------
1 file changed, 30 insertions(+), 27 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 8586ab905..c460b4c09 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -117,6 +117,7 @@ public class ShuffleTaskManager {
private final ShuffleBufferManager shuffleBufferManager;
private Map<String, ShuffleTaskInfo> shuffleTaskInfos =
JavaUtils.newConcurrentMap();
private Map<Long, PreAllocatedBufferInfo> requireBufferIds =
JavaUtils.newConcurrentMap();
+ private final Map<String, Long> removingApps = JavaUtils.newConcurrentMap();
private Thread clearResourceThread;
private BlockingQueue<PurgeEvent> expiredAppIdQueue =
Queues.newLinkedBlockingQueue();
private final Cache<String, ReentrantReadWriteLock> appLocks;
@@ -320,34 +321,30 @@ public class ShuffleTaskManager {
ShuffleDataDistributionType dataDistType,
int maxConcurrencyPerPartitionToWrite,
Map<String, String> properties) {
- ReentrantReadWriteLock.WriteLock lock = getAppWriteLock(appId);
- lock.lock();
- try {
- refreshAppId(appId);
-
- ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
- taskInfo.setProperties(properties);
- taskInfo.setUser(user);
- taskInfo.setSpecification(
- ShuffleSpecification.builder()
- .maxConcurrencyPerPartitionToWrite(
- getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite,
conf))
- .dataDistributionType(dataDistType)
- .build());
- taskInfo.setShuffleBlockIdManagerIfNeeded(shuffleBlockIdManager);
- taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
- taskInfo.getShuffleBlockIdManager().registerAppId(appId);
- for (PartitionRange partitionRange : partitionRanges) {
- shuffleBufferManager.registerBuffer(
- appId, shuffleId, partitionRange.getStart(),
partitionRange.getEnd());
- }
- if (!remoteStorageInfo.isEmpty()) {
- storageManager.registerRemoteStorage(appId, remoteStorageInfo);
- }
- return StatusCode.SUCCESS;
- } finally {
- lock.unlock();
+ if (isAppRemoving(appId)) {
+ return StatusCode.INVALID_REQUEST;
}
+ refreshAppId(appId);
+ ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
+ taskInfo.setProperties(properties);
+ taskInfo.setUser(user);
+ taskInfo.setSpecification(
+ ShuffleSpecification.builder()
+ .maxConcurrencyPerPartitionToWrite(
+ getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite,
conf))
+ .dataDistributionType(dataDistType)
+ .build());
+ taskInfo.setShuffleBlockIdManagerIfNeeded(shuffleBlockIdManager);
+ taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
+ taskInfo.getShuffleBlockIdManager().registerAppId(appId);
+ for (PartitionRange partitionRange : partitionRanges) {
+ shuffleBufferManager.registerBuffer(
+ appId, shuffleId, partitionRange.getStart(),
partitionRange.getEnd());
+ }
+ if (!remoteStorageInfo.isEmpty()) {
+ storageManager.registerRemoteStorage(appId, remoteStorageInfo);
+ }
+ return StatusCode.SUCCESS;
}
@VisibleForTesting
@@ -791,6 +788,10 @@ public class ShuffleTaskManager {
return System.currentTimeMillis() - shuffleTaskInfo.getCurrentTimes() >
appExpiredWithoutHB;
}
+ public boolean isAppRemoving(String appId) {
+ return removingApps.containsKey(appId);
+ }
+
/**
* Clear up the partial resources of shuffleIds of App.
*
@@ -877,6 +878,7 @@ public class ShuffleTaskManager {
return;
}
final long start = System.currentTimeMillis();
+ removingApps.put(appId, start);
ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId);
if (shuffleTaskInfo == null) {
LOG.info("Resource for appId[" + appId + "] had been removed before.");
@@ -939,6 +941,7 @@ public class ShuffleTaskManager {
+ (System.currentTimeMillis() - start)
+ " ms");
} finally {
+ removingApps.remove(appId);
lock.unlock();
}
}