This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 96cf2cc2 [ISSUE-475][Improvement] It's unnecessary to use 
ConcurrentHashMap for "partitionToBlockIds" in RssShuffleWriter (#480)
96cf2cc2 is described below

commit 96cf2cc292e1707865aa4875fda47f435c970818
Author: jiafu zhang <[email protected]>
AuthorDate: Mon Jan 16 23:04:23 2023 +0800

    [ISSUE-475][Improvement] It's unnecessary to use ConcurrentHashMap for 
"partitionToBlockIds" in RssShuffleWriter (#480)
    
    ### What changes were proposed in this pull request?
    replaced some unnecessary concurrenthashmp with hashmap
    
    ### Why are the changes needed?
    improve performance
    
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested with repartition workload
---
 .../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java   | 9 ++++-----
 .../java/org/apache/spark/shuffle/writer/RssShuffleWriter.java   | 9 ++++-----
 .../org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java   | 8 +++++---
 3 files changed, 13 insertions(+), 13 deletions(-)

diff --git 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 68a447d3..34deb77a 100644
--- 
a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -139,7 +139,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     this.sendSizeLimit = 
sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(),
         RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.defaultValue().get());
     this.bitmapSplitNum = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_BITMAP_SPLIT_NUM);
-    this.partitionToBlockIds = Maps.newConcurrentMap();
+    this.partitionToBlockIds = Maps.newHashMap();
     this.shuffleWriteClient = shuffleWriteClient;
     this.shuffleServersForData = rssHandle.getShuffleServersForData();
     this.partitionToServers = rssHandle.getPartitionToServers();
@@ -172,8 +172,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void writeImpl(Iterator<Product2<K,V>> records) {
-    List<ShuffleBlockInfo> shuffleBlockInfos = null;
-    Set<Long> blockIds = Sets.newConcurrentHashSet();
+    List<ShuffleBlockInfo> shuffleBlockInfos;
+    Set<Long> blockIds = Sets.newHashSet();
     while (records.hasNext()) {
       Product2<K, V> record = records.next();
       int partition = getPartition(record._1());
@@ -225,8 +225,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         blockIds.add(blockId);
         // update [partition, blockIds], it will be sent to shuffle server
         int partitionId = sbi.getPartitionId();
-        partitionToBlockIds.putIfAbsent(partitionId, 
Sets.newConcurrentHashSet());
-        partitionToBlockIds.get(partitionId).add(blockId);
+        partitionToBlockIds.computeIfAbsent(partitionId, k -> 
Sets.newHashSet()).add(blockId);
       });
       postBlockEvent(shuffleBlockInfoList);
     }
diff --git 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
index 4a6f4428..21cb50db 100644
--- 
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
+++ 
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/writer/RssShuffleWriter.java
@@ -140,7 +140,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
     this.sendSizeLimit = 
sparkConf.getSizeAsBytes(RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.key(),
         RssSparkConfig.RSS_CLIENT_SEND_SIZE_LIMIT.defaultValue().get());
     this.bitmapSplitNum = 
sparkConf.get(RssSparkConfig.RSS_CLIENT_BITMAP_SPLIT_NUM);
-    this.partitionToBlockIds = Maps.newConcurrentMap();
+    this.partitionToBlockIds = Maps.newHashMap();
     this.shuffleWriteClient = shuffleWriteClient;
     this.shuffleServersForData = rssHandle.getShuffleServersForData();
     this.partitionLengths = new long[partitioner.numPartitions()];
@@ -166,8 +166,8 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
   }
 
   private void writeImpl(Iterator<Product2<K,V>> records) {
-    List<ShuffleBlockInfo> shuffleBlockInfos = null;
-    Set<Long> blockIds = Sets.newConcurrentHashSet();
+    List<ShuffleBlockInfo> shuffleBlockInfos;
+    Set<Long> blockIds = Sets.newHashSet();
     boolean isCombine = shuffleDependency.mapSideCombine();
     Function1 createCombiner = null;
     if (isCombine) {
@@ -223,8 +223,7 @@ public class RssShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
         blockIds.add(blockId);
         // update [partition, blockIds], it will be sent to shuffle server
         int partitionId = sbi.getPartitionId();
-        partitionToBlockIds.putIfAbsent(partitionId, 
Sets.newConcurrentHashSet());
-        partitionToBlockIds.get(partitionId).add(blockId);
+        partitionToBlockIds.computeIfAbsent(partitionId, k -> 
Sets.newHashSet()).add(blockId);
         partitionLengths[partitionId] += sbi.getLength();
       });
       postBlockEvent(shuffleBlockInfoList);
diff --git 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
index 06c81361..a784e84c 100644
--- 
a/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
+++ 
b/client/src/main/java/org/apache/uniffle/client/impl/ShuffleWriteClientImpl.java
@@ -259,7 +259,9 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
     }
 
     // maintain the count of blocks that have been sent to the server
-    Map<Long, AtomicInteger> blockIdsTracker = Maps.newConcurrentMap();
+    // unnecessary to use concurrent hashmap here unless you need to insert or 
delete entries in other threads
+    // AtomicInteger is enough to reflect value changes in other threads
+    Map<Long, AtomicInteger> blockIdsTracker = Maps.newHashMap();
     primaryServerToBlockIds.values().forEach(
         blockList -> blockList.forEach(block -> blockIdsTracker.put(block, new 
AtomicInteger(0)))
     );
@@ -473,8 +475,8 @@ public class ShuffleWriteClientImpl implements 
ShuffleWriteClient {
       long taskAttemptId,
       Map<Integer, List<Long>> partitionToBlockIds,
       int bitmapNum) {
-    Map<ShuffleServerInfo, List<Integer>> groupedPartitions = 
Maps.newConcurrentMap();
-    Map<Integer, Integer> partitionReportTracker = Maps.newConcurrentMap();
+    Map<ShuffleServerInfo, List<Integer>> groupedPartitions = 
Maps.newHashMap();
+    Map<Integer, Integer> partitionReportTracker = Maps.newHashMap();
     for (Map.Entry<Integer, List<ShuffleServerInfo>> entry : 
partitionToServers.entrySet()) {
       for (ShuffleServerInfo ssi : entry.getValue()) {
         if (!groupedPartitions.containsKey(ssi)) {

Reply via email to