This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f732fc227 [#493][FOLLOWUP] improvement: replace putIfAbsent to avoid 
performance loss (#2444)
f732fc227 is described below

commit f732fc227ca726be80f169cd02b077e5257dbb5f
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Apr 16 14:55:41 2025 +0800

    [#493][FOLLOWUP] improvement: replace putIfAbsent to avoid performance loss 
(#2444)
    
    ### What changes were proposed in this pull request?
    
    Replace putIfAbsent to avoid performance loss
    
    ### Why are the changes needed?
    
    Fix: #493
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    CI
---
 .../apache/spark/shuffle/RssShuffleManager.java    |  8 ++++---
 .../apache/spark/shuffle/RssShuffleManager.java    |  8 ++++---
 .../common/shuffle/impl/RssShuffleManager.java     |  2 +-
 .../orderedgrouped/RssShuffleScheduler.java        |  2 +-
 .../uniffle/coordinator/ApplicationManager.java    |  2 +-
 .../org/apache/uniffle/server/merge/Shuffle.java   | 20 ++++++++++++++--
 .../uniffle/server/merge/ShuffleMergeManager.java  | 27 +++++++++++-----------
 .../uniffle/storage/common/LocalStorageMeta.java   | 21 +++++------------
 8 files changed, 51 insertions(+), 39 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 8e6b8dfca..31521a4b2 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -115,7 +115,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
 
     if (dependency.partitioner().numPartitions() == 0) {
       shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
-      shuffleIdToNumMapTasks.putIfAbsent(shuffleId, 
dependency.rdd().partitions().length);
+      shuffleIdToNumMapTasks.computeIfAbsent(
+          shuffleId, key -> dependency.rdd().partitions().length);
       LOG.info(
           "RegisterShuffle with ShuffleId["
               + shuffleId
@@ -158,8 +159,9 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
 
     startHeartbeat();
 
-    shuffleIdToPartitionNum.putIfAbsent(shuffleId, 
dependency.partitioner().numPartitions());
-    shuffleIdToNumMapTasks.putIfAbsent(shuffleId, 
dependency.rdd().partitions().length);
+    shuffleIdToPartitionNum.computeIfAbsent(
+        shuffleId, key -> dependency.partitioner().numPartitions());
+    shuffleIdToNumMapTasks.computeIfAbsent(shuffleId, key -> 
dependency.rdd().partitions().length);
     if (shuffleManagerRpcServiceEnabled && 
rssStageRetryForWriteFailureEnabled) {
       ShuffleHandleInfo handleInfo =
           new MutableShuffleHandleInfo(shuffleId, partitionToServers, 
remoteStorage);
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index bc7553e13..12f2fada5 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -135,7 +135,8 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
 
     if (dependency.partitioner().numPartitions() == 0) {
       shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
-      shuffleIdToNumMapTasks.putIfAbsent(shuffleId, 
dependency.rdd().partitions().length);
+      shuffleIdToNumMapTasks.computeIfAbsent(
+          shuffleId, key -> dependency.rdd().partitions().length);
       LOG.info(
           "RegisterShuffle with ShuffleId["
               + shuffleId
@@ -175,8 +176,9 @@ public class RssShuffleManager extends 
RssShuffleManagerBase {
             rssStageResubmitManager.getServerIdBlackList(),
             0);
     startHeartbeat();
-    shuffleIdToPartitionNum.putIfAbsent(shuffleId, 
dependency.partitioner().numPartitions());
-    shuffleIdToNumMapTasks.putIfAbsent(shuffleId, 
dependency.rdd().partitions().length);
+    shuffleIdToPartitionNum.computeIfAbsent(
+        shuffleId, key -> dependency.partitioner().numPartitions());
+    shuffleIdToNumMapTasks.computeIfAbsent(shuffleId, key -> 
dependency.rdd().partitions().length);
     if (shuffleManagerRpcServiceEnabled && 
rssStageRetryForWriteFailureEnabled) {
       ShuffleHandleInfo shuffleHandleInfo =
           new MutableShuffleHandleInfo(
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
index dfdc6f840..3b23e1852 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
@@ -870,7 +870,7 @@ public class RssShuffleManager extends ShuffleManager {
           pendingPartition.add(p);
         }
         allRssPartition.add(p);
-        partitionToInput.putIfAbsent(p, new ArrayList<>());
+        partitionToInput.computeIfAbsent(p, key -> new ArrayList<>());
         partitionToInput.get(p).add(srcAttemptIdentifier);
         LOG.info("Add partition:{}, after add, now partition:{}", p, 
allRssPartition);
       }
diff --git 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index 5f5c2c7af..987dc41fa 100644
--- 
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++ 
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -1299,7 +1299,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
     partitionIdToSuccessMapTaskAttempts.get(partitionId).add(srcAttempt);
     String pathComponent = srcAttempt.getPathComponent();
     TezTaskAttemptID tezTaskAttemptId = 
IdUtils.convertTezTaskAttemptID(pathComponent);
-    partitionIdToSuccessTezTasks.putIfAbsent(partitionId, new HashSet<>());
+    partitionIdToSuccessTezTasks.computeIfAbsent(partitionId, key -> new 
HashSet<>());
     
partitionIdToSuccessTezTasks.get(partitionId).add(tezTaskAttemptId.getTaskID());
 
     uniqueHosts.add(new HostPort(inputHostName, port));
diff --git 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index afbcffd83..b1b92d8de 100644
--- 
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++ 
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -276,7 +276,7 @@ public class ApplicationManager implements Closeable {
         }
       } else {
         LOG.warn("Can't find counter for remote storage: {}", storagePath);
-        remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(0));
+        remoteStoragePathRankValue.computeIfAbsent(storagePath, key -> new 
RankValue(0));
       }
       if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0
           && !availableRemoteStorageInfo.containsKey(storagePath)) {
diff --git a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java 
b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
index 9f0099702..c7dff62a0 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server.merge;
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -69,8 +70,23 @@ public class Shuffle<K, V> {
 
   public void startSortMerge(int partitionId, Roaring64NavigableMap 
expectedBlockIdMap)
       throws IOException {
-    this.partitions.putIfAbsent(partitionId, new Partition<K, V>(this, 
partitionId));
-    this.partitions.get(partitionId).startSortMerge(expectedBlockIdMap);
+    AtomicReference<IOException> exception = new AtomicReference<>();
+    Partition<K, V> partition =
+        this.partitions.computeIfAbsent(
+            partitionId,
+            key -> {
+              try {
+                return new Partition<K, V>(this, partitionId);
+              } catch (IOException e) {
+                exception.set(e);
+              }
+              return null;
+            });
+    if (exception.get() != null) {
+      throw exception.get();
+    }
+    assert partition != null;
+    partition.startSortMerge(expectedBlockIdMap);
   }
 
   void cleanup() {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java 
b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
index 027b63d9d..4de51df3b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
@@ -163,22 +163,23 @@ public class ShuffleMergeManager {
       } else {
         comparator = defaultComparator;
       }
-      this.shuffles.putIfAbsent(appId, JavaUtils.newConcurrentMap());
+      this.shuffles.computeIfAbsent(appId, key -> 
JavaUtils.newConcurrentMap());
       this.shuffles
           .get(appId)
-          .putIfAbsent(
+          .computeIfAbsent(
               shuffleId,
-              new Shuffle(
-                  serverConf,
-                  eventHandler,
-                  shuffleServer,
-                  appId,
-                  shuffleId,
-                  kClass,
-                  vClass,
-                  comparator,
-                  mergeContext.getMergedBlockSize(),
-                  classLoader));
+              key ->
+                  new Shuffle(
+                      serverConf,
+                      eventHandler,
+                      shuffleServer,
+                      appId,
+                      shuffleId,
+                      kClass,
+                      vClass,
+                      comparator,
+                      mergeContext.getMergedBlockSize(),
+                      classLoader));
     } catch (ClassNotFoundException
         | InstantiationException
         | IllegalAccessException
diff --git 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index 0cb80d4c1..8b5bea2ef 100644
--- 
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++ 
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -93,22 +93,13 @@ public class LocalStorageMeta {
     this.size.set(diskSize);
   }
 
-  /**
-   * If the method is implemented as below:
-   *
-   * <p>if (shuffleMetaMap.contains(shuffleId)) { // `Time A` return 
shuffleMetaMap.get(shuffleId) }
-   * else { shuffleMetaMap.putIfAbsent(shuffleId, newMeta) return newMeta }
-   *
-   * <p>Because if shuffleMetaMap remove shuffleId at `Time A` in another 
thread,
-   * shuffleMetaMap.get(shuffleId) will return null. We need to guarantee that 
this method is thread
-   * safe, and won't return null.
-   */
   public void createMetadataIfNotExist(String shuffleKey) {
-    ShuffleMeta meta = new ShuffleMeta();
-    ShuffleMeta oldMeta = shuffleMetaMap.putIfAbsent(shuffleKey, meta);
-    if (oldMeta == null) {
-      LOG.info("Create metadata of shuffle {}.", shuffleKey);
-    }
+    shuffleMetaMap.computeIfAbsent(
+        shuffleKey,
+        key -> {
+          LOG.info("Create metadata of shuffle {}.", key);
+          return new ShuffleMeta();
+        });
   }
 
   private ShuffleMeta getShuffleMeta(String shuffleKey) {

Reply via email to