This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 95dccbda6 [#2454] improvement(server): Remove lock in 
ShuffleTaskManager#registerShuffle (#2456)
95dccbda6 is described below

commit 95dccbda6920ef761a015578e79de54eabd46469
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Apr 30 09:53:24 2025 +0800

    [#2454] improvement(server): Remove lock in 
ShuffleTaskManager#registerShuffle (#2456)
    
    ### What changes were proposed in this pull request?
    Remove the lock in ShuffleTaskManager#registerShuffle
    App with heartbeat timeout is not allowed to register again
    
    ### Why are the changes needed?
    For better performance
    Fix: #2454
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    CI
---
 .../apache/uniffle/server/ShuffleTaskManager.java  | 57 ++++++++++++----------
 1 file changed, 30 insertions(+), 27 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index 8586ab905..c460b4c09 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -117,6 +117,7 @@ public class ShuffleTaskManager {
   private final ShuffleBufferManager shuffleBufferManager;
   private Map<String, ShuffleTaskInfo> shuffleTaskInfos = 
JavaUtils.newConcurrentMap();
   private Map<Long, PreAllocatedBufferInfo> requireBufferIds = 
JavaUtils.newConcurrentMap();
+  private final Map<String, Long> removingApps = JavaUtils.newConcurrentMap();
   private Thread clearResourceThread;
   private BlockingQueue<PurgeEvent> expiredAppIdQueue = 
Queues.newLinkedBlockingQueue();
   private final Cache<String, ReentrantReadWriteLock> appLocks;
@@ -320,34 +321,30 @@ public class ShuffleTaskManager {
       ShuffleDataDistributionType dataDistType,
       int maxConcurrencyPerPartitionToWrite,
       Map<String, String> properties) {
-    ReentrantReadWriteLock.WriteLock lock = getAppWriteLock(appId);
-    lock.lock();
-    try {
-      refreshAppId(appId);
-
-      ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
-      taskInfo.setProperties(properties);
-      taskInfo.setUser(user);
-      taskInfo.setSpecification(
-          ShuffleSpecification.builder()
-              .maxConcurrencyPerPartitionToWrite(
-                  getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, 
conf))
-              .dataDistributionType(dataDistType)
-              .build());
-      taskInfo.setShuffleBlockIdManagerIfNeeded(shuffleBlockIdManager);
-      taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
-      taskInfo.getShuffleBlockIdManager().registerAppId(appId);
-      for (PartitionRange partitionRange : partitionRanges) {
-        shuffleBufferManager.registerBuffer(
-            appId, shuffleId, partitionRange.getStart(), 
partitionRange.getEnd());
-      }
-      if (!remoteStorageInfo.isEmpty()) {
-        storageManager.registerRemoteStorage(appId, remoteStorageInfo);
-      }
-      return StatusCode.SUCCESS;
-    } finally {
-      lock.unlock();
+    if (isAppRemoving(appId)) {
+      return StatusCode.INVALID_REQUEST;
     }
+    refreshAppId(appId);
+    ShuffleTaskInfo taskInfo = shuffleTaskInfos.get(appId);
+    taskInfo.setProperties(properties);
+    taskInfo.setUser(user);
+    taskInfo.setSpecification(
+        ShuffleSpecification.builder()
+            .maxConcurrencyPerPartitionToWrite(
+                getMaxConcurrencyWriting(maxConcurrencyPerPartitionToWrite, 
conf))
+            .dataDistributionType(dataDistType)
+            .build());
+    taskInfo.setShuffleBlockIdManagerIfNeeded(shuffleBlockIdManager);
+    taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
+    taskInfo.getShuffleBlockIdManager().registerAppId(appId);
+    for (PartitionRange partitionRange : partitionRanges) {
+      shuffleBufferManager.registerBuffer(
+          appId, shuffleId, partitionRange.getStart(), 
partitionRange.getEnd());
+    }
+    if (!remoteStorageInfo.isEmpty()) {
+      storageManager.registerRemoteStorage(appId, remoteStorageInfo);
+    }
+    return StatusCode.SUCCESS;
   }
 
   @VisibleForTesting
@@ -791,6 +788,10 @@ public class ShuffleTaskManager {
     return System.currentTimeMillis() - shuffleTaskInfo.getCurrentTimes() > 
appExpiredWithoutHB;
   }
 
+  public boolean isAppRemoving(String appId) {
+    return removingApps.containsKey(appId);
+  }
+
   /**
    * Clear up the partial resources of shuffleIds of App.
    *
@@ -877,6 +878,7 @@ public class ShuffleTaskManager {
         return;
       }
       final long start = System.currentTimeMillis();
+      removingApps.put(appId, start);
       ShuffleTaskInfo shuffleTaskInfo = shuffleTaskInfos.remove(appId);
       if (shuffleTaskInfo == null) {
         LOG.info("Resource for appId[" + appId + "] had been removed before.");
@@ -939,6 +941,7 @@ public class ShuffleTaskManager {
               + (System.currentTimeMillis() - start)
               + " ms");
     } finally {
+      removingApps.remove(appId);
       lock.unlock();
     }
   }

Reply via email to