This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 96cf2cc2 [ISSUE-475][Improvement] It's unnecessary to use
ConcurrentHashMap for "partitionToBlockIds" in RssShuffleWriter (#480)
96cf2cc2 is described below
commit 96cf2cc292e1707865aa4875fda47f435c970818
Author: jiafu zhang <[email protected]>
AuthorDate: Mon Jan 16 23:04:23 2023 +0800
[ISSUE-475][Improvement] It's unnecessary to use ConcurrentHashMap for
"partitionToBlockIds" in RssShuffleWriter (#480)
### What changes were proposed in this pull request?
replaced some unnecessary concurrenthashmp with hashmap
### Why are the changes needed?
improve performance
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Tested with repartition workload
---
.../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 9 ++++-----
.../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java | 9 ++++-----
.../org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java | 8 +++++---
3 files changed, 13 insertions(+), 13 deletions(-)
diff --git
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 68a447d3..34deb77a 100644
---
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -139,7 +139,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
this.sendSizeLimit =
sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(),
RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.defaultValue().get());
this.bitmapSplitNum =
sparkConf.get(RssSparkConfig.RSS_CLIENT_BITMAP_SPLIT_NUM);
- this.partitionToBlockIds = Maps.newConcurrentMap();
+ this.partitionToBlockIds = Maps.newHashMap();
this.shuffleWriteClient = shuffleWriteClient;
this.shuffleServersForData = rssHandle.getShuffleServersForData();
this.partitionToServers = rssHandle.getPartitionToServers();
@@ -172,8 +172,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void writeImpl(Iterator<Product2<K,V>> records) {
- List<ShuffleBlockInfo> shuffleBlockInfos = null;
- Set<Long> blockIds = Sets.newConcurrentHashSet();
+ List<ShuffleBlockInfo> shuffleBlockInfos;
+ Set<Long> blockIds = Sets.newHashSet();
while (records.hasNext()) {
Product2<K, V> record = records.next();
int partition = getPartition(record._1());
@@ -225,8 +225,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
blockIds.add(blockId);
// update [partition, blockIds], it will be sent to shuffle server
int partitionId = sbi.getPartitionId();
- partitionToBlockIds.putIfAbsent(partitionId,
Sets.newConcurrentHashSet());
- partitionToBlockIds.get(partitionId).add(blockId);
+ partitionToBlockIds.computeIfAbsent(partitionId, k ->
Sets.newHashSet()).add(blockId);
});
postBlockEvent(shuffleBlockInfoList);
}
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4a6f4428..21cb50db 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -140,7 +140,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
this.sendSizeLimit =
sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(),
RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.defaultValue().get());
this.bitmapSplitNum =
sparkConf.get(RssSparkConfig.RSS_CLIENT_BITMAP_SPLIT_NUM);
- this.partitionToBlockIds = Maps.newConcurrentMap();
+ this.partitionToBlockIds = Maps.newHashMap();
this.shuffleWriteClient = shuffleWriteClient;
this.shuffleServersForData = rssHandle.getShuffleServersForData();
this.partitionLengths = new long[partitioner.numPartitions()];
@@ -166,8 +166,8 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
}
private void writeImpl(Iterator<Product2<K,V>> records) {
- List<ShuffleBlockInfo> shuffleBlockInfos = null;
- Set<Long> blockIds = Sets.newConcurrentHashSet();
+ List<ShuffleBlockInfo> shuffleBlockInfos;
+ Set<Long> blockIds = Sets.newHashSet();
boolean isCombine = shuffleDependency.mapSideCombine();
Function1 createCombiner = null;
if (isCombine) {
@@ -223,8 +223,7 @@ public class RssShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
blockIds.add(blockId);
// update [partition, blockIds], it will be sent to shuffle server
int partitionId = sbi.getPartitionId();
- partitionToBlockIds.putIfAbsent(partitionId,
Sets.newConcurrentHashSet());
- partitionToBlockIds.get(partitionId).add(blockId);
+ partitionToBlockIds.computeIfAbsent(partitionId, k ->
Sets.newHashSet()).add(blockId);
partitionLengths[partitionId] += sbi.getLength();
});
postBlockEvent(shuffleBlockInfoList);
diff --git
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 06c81361..a784e84c 100644
---
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -259,7 +259,9 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
}
// maintain the count of blocks that have been sent to the server
- Map<Long, AtomicInteger> blockIdsTracker = Maps.newConcurrentMap();
+ // unnecessary to use concurrent hashmap here unless you need to insert or
delete entries in other threads
+ // AtomicInteger is enough to reflect value changes in other threads
+ Map<Long, AtomicInteger> blockIdsTracker = Maps.newHashMap();
primaryServerToBlockIds.values().forEach(
blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new
AtomicInteger(0)))
);
@@ -473,8 +475,8 @@ public class ShuffleWriteClientImpl implements
ShuffleWriteClient {
long taskAttemptId,
Map<Integer, List<Long>> partitionToBlockIds,
int bitmapNum) {
- Map<ShuffleServerInfo, List<Integer>> groupedPartitions =
Maps.newConcurrentMap();
- Map<Integer, Integer> partitionReportTracker = Maps.newConcurrentMap();
+ Map<ShuffleServerInfo, List<Integer>> groupedPartitions =
Maps.newHashMap();
+ Map<Integer, Integer> partitionReportTracker = Maps.newHashMap();
for (Map.Entry<Integer, List<ShuffleServerInfo>> entry :
partitionToServers.entrySet()) {
for (ShuffleServerInfo ssi : entry.getValue()) {
if (!groupedPartitions.containsKey(ssi)) {