This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f732fc227 [#493][FOLLOWUP] improvement: replace putIfAbsent to avoid
performance loss (#2444)
f732fc227 is described below
commit f732fc227ca726be80f169cd02b077e5257dbb5f
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Apr 16 14:55:41 2025 +0800
[#493][FOLLOWUP] improvement: replace putIfAbsent to avoid performance loss
(#2444)
### What changes were proposed in this pull request?
Replace putIfAbsent to avoid performance loss
### Why are the changes needed?
Fix: #493
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
CI
---
.../apache/spark/shuffle/RssShuffleManager.java | 8 ++++---
.../apache/spark/shuffle/RssShuffleManager.java | 8 ++++---
.../common/shuffle/impl/RssShuffleManager.java | 2 +-
.../orderedgrouped/RssShuffleScheduler.java | 2 +-
.../uniffle/coordinator/ApplicationManager.java | 2 +-
.../org/apache/uniffle/server/merge/Shuffle.java | 20 ++++++++++++++--
.../uniffle/server/merge/ShuffleMergeManager.java | 27 +++++++++++-----------
.../uniffle/storage/common/LocalStorageMeta.java | 21 +++++------------
8 files changed, 51 insertions(+), 39 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index 8e6b8dfca..31521a4b2 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -115,7 +115,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (dependency.partitioner().numPartitions() == 0) {
shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
- shuffleIdToNumMapTasks.putIfAbsent(shuffleId,
dependency.rdd().partitions().length);
+ shuffleIdToNumMapTasks.computeIfAbsent(
+ shuffleId, key -> dependency.rdd().partitions().length);
LOG.info(
"RegisterShuffle with ShuffleId["
+ shuffleId
@@ -158,8 +159,9 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
startHeartbeat();
- shuffleIdToPartitionNum.putIfAbsent(shuffleId,
dependency.partitioner().numPartitions());
- shuffleIdToNumMapTasks.putIfAbsent(shuffleId,
dependency.rdd().partitions().length);
+ shuffleIdToPartitionNum.computeIfAbsent(
+ shuffleId, key -> dependency.partitioner().numPartitions());
+ shuffleIdToNumMapTasks.computeIfAbsent(shuffleId, key ->
dependency.rdd().partitions().length);
if (shuffleManagerRpcServiceEnabled &&
rssStageRetryForWriteFailureEnabled) {
ShuffleHandleInfo handleInfo =
new MutableShuffleHandleInfo(shuffleId, partitionToServers,
remoteStorage);
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
index bc7553e13..12f2fada5 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java
@@ -135,7 +135,8 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
if (dependency.partitioner().numPartitions() == 0) {
shuffleIdToPartitionNum.putIfAbsent(shuffleId, 0);
- shuffleIdToNumMapTasks.putIfAbsent(shuffleId,
dependency.rdd().partitions().length);
+ shuffleIdToNumMapTasks.computeIfAbsent(
+ shuffleId, key -> dependency.rdd().partitions().length);
LOG.info(
"RegisterShuffle with ShuffleId["
+ shuffleId
@@ -175,8 +176,9 @@ public class RssShuffleManager extends
RssShuffleManagerBase {
rssStageResubmitManager.getServerIdBlackList(),
0);
startHeartbeat();
- shuffleIdToPartitionNum.putIfAbsent(shuffleId,
dependency.partitioner().numPartitions());
- shuffleIdToNumMapTasks.putIfAbsent(shuffleId,
dependency.rdd().partitions().length);
+ shuffleIdToPartitionNum.computeIfAbsent(
+ shuffleId, key -> dependency.partitioner().numPartitions());
+ shuffleIdToNumMapTasks.computeIfAbsent(shuffleId, key ->
dependency.rdd().partitions().length);
if (shuffleManagerRpcServiceEnabled &&
rssStageRetryForWriteFailureEnabled) {
ShuffleHandleInfo shuffleHandleInfo =
new MutableShuffleHandleInfo(
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
index dfdc6f840..3b23e1852 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/RssShuffleManager.java
@@ -870,7 +870,7 @@ public class RssShuffleManager extends ShuffleManager {
pendingPartition.add(p);
}
allRssPartition.add(p);
- partitionToInput.putIfAbsent(p, new ArrayList<>());
+ partitionToInput.computeIfAbsent(p, key -> new ArrayList<>());
partitionToInput.get(p).add(srcAttemptIdentifier);
LOG.info("Add partition:{}, after add, now partition:{}", p,
allRssPartition);
}
diff --git
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
index 5f5c2c7af..987dc41fa 100644
---
a/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
+++
b/client-tez/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/RssShuffleScheduler.java
@@ -1299,7 +1299,7 @@ class RssShuffleScheduler extends ShuffleScheduler {
partitionIdToSuccessMapTaskAttempts.get(partitionId).add(srcAttempt);
String pathComponent = srcAttempt.getPathComponent();
TezTaskAttemptID tezTaskAttemptId =
IdUtils.convertTezTaskAttemptID(pathComponent);
- partitionIdToSuccessTezTasks.putIfAbsent(partitionId, new HashSet<>());
+ partitionIdToSuccessTezTasks.computeIfAbsent(partitionId, key -> new
HashSet<>());
partitionIdToSuccessTezTasks.get(partitionId).add(tezTaskAttemptId.getTaskID());
uniqueHosts.add(new HostPort(inputHostName, port));
diff --git
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
index afbcffd83..b1b92d8de 100644
---
a/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
+++
b/coordinator/src/main/java/org/apache/uniffle/coordinator/ApplicationManager.java
@@ -276,7 +276,7 @@ public class ApplicationManager implements Closeable {
}
} else {
LOG.warn("Can't find counter for remote storage: {}", storagePath);
- remoteStoragePathRankValue.putIfAbsent(storagePath, new RankValue(0));
+ remoteStoragePathRankValue.computeIfAbsent(storagePath, key -> new
RankValue(0));
}
if (remoteStoragePathRankValue.get(storagePath).getAppNum().get() == 0
&& !availableRemoteStorageInfo.containsKey(storagePath)) {
diff --git a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
index 9f0099702..c7dff62a0 100644
--- a/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
+++ b/server/src/main/java/org/apache/uniffle/server/merge/Shuffle.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server.merge;
import java.io.IOException;
import java.util.Comparator;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import org.roaringbitmap.longlong.Roaring64NavigableMap;
@@ -69,8 +70,23 @@ public class Shuffle<K, V> {
public void startSortMerge(int partitionId, Roaring64NavigableMap
expectedBlockIdMap)
throws IOException {
- this.partitions.putIfAbsent(partitionId, new Partition<K, V>(this,
partitionId));
- this.partitions.get(partitionId).startSortMerge(expectedBlockIdMap);
+ AtomicReference<IOException> exception = new AtomicReference<>();
+ Partition<K, V> partition =
+ this.partitions.computeIfAbsent(
+ partitionId,
+ key -> {
+ try {
+ return new Partition<K, V>(this, partitionId);
+ } catch (IOException e) {
+ exception.set(e);
+ }
+ return null;
+ });
+ if (exception.get() != null) {
+ throw exception.get();
+ }
+ assert partition != null;
+ partition.startSortMerge(expectedBlockIdMap);
}
void cleanup() {
diff --git
a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
index 027b63d9d..4de51df3b 100644
---
a/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/merge/ShuffleMergeManager.java
@@ -163,22 +163,23 @@ public class ShuffleMergeManager {
} else {
comparator = defaultComparator;
}
- this.shuffles.putIfAbsent(appId, JavaUtils.newConcurrentMap());
+ this.shuffles.computeIfAbsent(appId, key ->
JavaUtils.newConcurrentMap());
this.shuffles
.get(appId)
- .putIfAbsent(
+ .computeIfAbsent(
shuffleId,
- new Shuffle(
- serverConf,
- eventHandler,
- shuffleServer,
- appId,
- shuffleId,
- kClass,
- vClass,
- comparator,
- mergeContext.getMergedBlockSize(),
- classLoader));
+ key ->
+ new Shuffle(
+ serverConf,
+ eventHandler,
+ shuffleServer,
+ appId,
+ shuffleId,
+ kClass,
+ vClass,
+ comparator,
+ mergeContext.getMergedBlockSize(),
+ classLoader));
} catch (ClassNotFoundException
| InstantiationException
| IllegalAccessException
diff --git
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
index 0cb80d4c1..8b5bea2ef 100644
---
a/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
+++
b/storage/src/main/java/org/apache/uniffle/storage/common/LocalStorageMeta.java
@@ -93,22 +93,13 @@ public class LocalStorageMeta {
this.size.set(diskSize);
}
- /**
- * If the method is implemented as below:
- *
- * <p>if (shuffleMetaMap.contains(shuffleId)) { // `Time A` return
shuffleMetaMap.get(shuffleId) }
- * else { shuffleMetaMap.putIfAbsent(shuffleId, newMeta) return newMeta }
- *
- * <p>Because if shuffleMetaMap remove shuffleId at `Time A` in another
thread,
- * shuffleMetaMap.get(shuffleId) will return null. We need to guarantee that
this method is thread
- * safe, and won't return null.
- */
public void createMetadataIfNotExist(String shuffleKey) {
- ShuffleMeta meta = new ShuffleMeta();
- ShuffleMeta oldMeta = shuffleMetaMap.putIfAbsent(shuffleKey, meta);
- if (oldMeta == null) {
- LOG.info("Create metadata of shuffle {}.", shuffleKey);
- }
+ shuffleMetaMap.computeIfAbsent(
+ shuffleKey,
+ key -> {
+ LOG.info("Create metadata of shuffle {}.", key);
+ return new ShuffleMeta();
+ });
}
private ShuffleMeta getShuffleMeta(String shuffleKey) {