This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new f57eabe13 [#2545] feat: Align with spark executor cores on overlapping 
compression (#2548)
f57eabe13 is described below

commit f57eabe132747c2b88a3186869e3091eaf840632
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jul 23 17:29:33 2025 +0800

    [#2545] feat: Align with spark executor cores on overlapping compression 
(#2548)
    
    ### What changes were proposed in this pull request?
    
    Make the overlapping compression threads pool's threads number aligned with 
the spark executor vcores by the ratio of per vcore.
    
    ### Why are the changes needed?
    
    Compression is a computing sensitive workload, we'd better to obey the rule 
of vcores allocation.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    `rss.client.write.overlappingCompressionThreadsPerVcore = -1`
    
    ### How was this patch tested?
    
    existing unit tests.
---
 .../main/java/org/apache/spark/shuffle/RssSparkConfig.java    |  6 +++---
 .../shuffle/writer/OverlappingCompressionDataPusher.java      |  8 ++------
 .../apache/uniffle/shuffle/manager/RssShuffleManagerBase.java | 11 +++++++----
 .../shuffle/writer/OverlappingCompressionDataPusherTest.java  |  8 ++++----
 4 files changed, 16 insertions(+), 17 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 4bffd227d..8af38703b 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -45,12 +45,12 @@ public class RssSparkConfig {
           .defaultValue(false)
           .withDescription("Whether to overlapping compress shuffle blocks.");
 
-  public static final ConfigOption<Integer> 
RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS =
-      ConfigOptions.key("rss.client.write.overlappingCompressionThreads")
+  public static final ConfigOption<Integer> 
RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE =
+      
ConfigOptions.key("rss.client.write.overlappingCompressionThreadsPerVcore")
           .intType()
           .defaultValue(-1)
           .withDescription(
-              "The number of threads to overlapping compress shuffle blocks. 
If <= 0, this will be disabled.");
+              "Specifies the ratio between the number of overlapping 
compression threads and the number of Spark executor vcores. It's disabled by 
default.");
 
   public static final ConfigOption<Boolean> 
RSS_READ_REORDER_MULTI_SERVERS_ENABLED =
       ConfigOptions.key("rss.client.read.reorderMultiServersEnable")
diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
index c6e939421..a8c593f2f 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.uniffle.client.api.ShuffleWriteClient;
 import org.apache.uniffle.client.impl.FailedBlockSendTracker;
-import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.ThreadUtils;
 
@@ -48,7 +47,7 @@ public class OverlappingCompressionDataPusher extends 
DataPusher {
       Set<String> failedTaskIds,
       int threadPoolSize,
       int threadKeepAliveTime,
-      RssConf rssConf) {
+      int compressionThreads) {
     super(
         shuffleWriteClient,
         taskToSuccessBlockIds,
@@ -56,13 +55,10 @@ public class OverlappingCompressionDataPusher extends 
DataPusher {
         failedTaskIds,
         threadPoolSize,
         threadKeepAliveTime);
-
-    int compressionThreads =
-        
rssConf.getInteger(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS);
     if (compressionThreads <= 0) {
       throw new RssException(
           "Invalid rss configuration of "
-              + RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS.key()
+              + 
RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE.key()
               + ": "
               + compressionThreads);
     }
diff --git 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index dee4c2ef7..11bc1aa4c 100644
--- 
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++ 
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -106,6 +106,7 @@ import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.shuffle.BlockIdManager;
 import org.apache.uniffle.shuffle.ShuffleIdMappingManager;
 
+import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES;
 import static 
org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED;
 import static 
org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM;
 import static 
org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
@@ -342,9 +343,11 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
 
     boolean overlappingCompressionEnabled =
         rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
-    int overlappingCompressionThreads =
-        rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS);
-    if (overlappingCompressionEnabled && overlappingCompressionThreads > 0) {
+    int overlappingCompressionThreadsPerVcore =
+        
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
+    if (overlappingCompressionEnabled && overlappingCompressionThreadsPerVcore 
> 0) {
+      int compressionThreads =
+          overlappingCompressionThreadsPerVcore * 
sparkConf.getInt(EXECUTOR_CORES, 1);
       this.dataPusher =
           new OverlappingCompressionDataPusher(
               shuffleWriteClient,
@@ -353,7 +356,7 @@ public abstract class RssShuffleManagerBase implements 
RssShuffleManagerInterfac
               failedTaskIds,
               poolSize,
               keepAliveTime,
-              rssConf);
+              compressionThreads);
     } else {
       this.dataPusher =
           new DataPusher(
diff --git 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
index ced0fdfa7..ffa22771c 100644
--- 
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
+++ 
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
@@ -26,7 +26,6 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import com.google.common.collect.Maps;
-import org.apache.spark.shuffle.RssSparkConfig;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -37,6 +36,7 @@ import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 import org.apache.uniffle.common.util.JavaUtils;
 
+import static 
org.apache.spark.shuffle.RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class OverlappingCompressionDataPusherTest {
@@ -51,6 +51,7 @@ public class OverlappingCompressionDataPusherTest {
     Set<String> failedTaskIds = new HashSet<>();
 
     RssConf rssConf = new RssConf();
+    int threads = 
rssConf.get(RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
 
     // case1: Illegal thread number of compression
     Assertions.assertThrows(
@@ -63,11 +64,10 @@ public class OverlappingCompressionDataPusherTest {
               failedTaskIds,
               1,
               2,
-              rssConf);
+              threads);
         });
 
     // case2: Propagated into the underlying data pusher
-    rssConf.set(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS, 1);
     DataPusher pusher =
         new OverlappingCompressionDataPusher(
             shuffleWriteClient,
@@ -76,7 +76,7 @@ public class OverlappingCompressionDataPusherTest {
             failedTaskIds,
             1,
             2,
-            rssConf);
+            1);
     pusher.setRssAppId("testSend");
 
     String taskId = "taskId1";

Reply via email to