This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f57eabe13 [#2545] feat: Align with spark executor cores on overlapping
compression (#2548)
f57eabe13 is described below
commit f57eabe132747c2b88a3186869e3091eaf840632
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jul 23 17:29:33 2025 +0800
[#2545] feat: Align with spark executor cores on overlapping compression
(#2548)
### What changes were proposed in this pull request?
Make the overlapping compression threads pool's threads number aligned with
the spark executor vcores by the ratio of per vcore.
### Why are the changes needed?
Compression is a computing sensitive workload, we'd better to obey the rule
of vcores allocation.
### Does this PR introduce _any_ user-facing change?
Yes.
`rss.client.write.overlappingCompressionThreadsPerVcore = -1`
### How was this patch tested?
existing unit tests.
---
.../main/java/org/apache/spark/shuffle/RssSparkConfig.java | 6 +++---
.../shuffle/writer/OverlappingCompressionDataPusher.java | 8 ++------
.../apache/uniffle/shuffle/manager/RssShuffleManagerBase.java | 11 +++++++----
.../shuffle/writer/OverlappingCompressionDataPusherTest.java | 8 ++++----
4 files changed, 16 insertions(+), 17 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 4bffd227d..8af38703b 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -45,12 +45,12 @@ public class RssSparkConfig {
.defaultValue(false)
.withDescription("Whether to overlapping compress shuffle blocks.");
- public static final ConfigOption<Integer>
RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS =
- ConfigOptions.key("rss.client.write.overlappingCompressionThreads")
+ public static final ConfigOption<Integer>
RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE =
+
ConfigOptions.key("rss.client.write.overlappingCompressionThreadsPerVcore")
.intType()
.defaultValue(-1)
.withDescription(
- "The number of threads to overlapping compress shuffle blocks.
If <= 0, this will be disabled.");
+ "Specifies the ratio between the number of overlapping
compression threads and the number of Spark executor vcores. It's disabled by
default.");
public static final ConfigOption<Boolean>
RSS_READ_REORDER_MULTI_SERVERS_ENABLED =
ConfigOptions.key("rss.client.read.reorderMultiServersEnable")
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
index c6e939421..a8c593f2f 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
@@ -29,7 +29,6 @@ import org.slf4j.LoggerFactory;
import org.apache.uniffle.client.api.ShuffleWriteClient;
import org.apache.uniffle.client.impl.FailedBlockSendTracker;
-import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.ThreadUtils;
@@ -48,7 +47,7 @@ public class OverlappingCompressionDataPusher extends
DataPusher {
Set<String> failedTaskIds,
int threadPoolSize,
int threadKeepAliveTime,
- RssConf rssConf) {
+ int compressionThreads) {
super(
shuffleWriteClient,
taskToSuccessBlockIds,
@@ -56,13 +55,10 @@ public class OverlappingCompressionDataPusher extends
DataPusher {
failedTaskIds,
threadPoolSize,
threadKeepAliveTime);
-
- int compressionThreads =
-
rssConf.getInteger(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS);
if (compressionThreads <= 0) {
throw new RssException(
"Invalid rss configuration of "
- + RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS.key()
+ +
RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE.key()
+ ": "
+ compressionThreads);
}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index dee4c2ef7..11bc1aa4c 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -106,6 +106,7 @@ import org.apache.uniffle.common.util.ThreadUtils;
import org.apache.uniffle.shuffle.BlockIdManager;
import org.apache.uniffle.shuffle.ShuffleIdMappingManager;
+import static org.apache.spark.launcher.SparkLauncher.EXECUTOR_CORES;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_BLOCK_ID_SELF_MANAGEMENT_ENABLED;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
@@ -342,9 +343,11 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
boolean overlappingCompressionEnabled =
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
- int overlappingCompressionThreads =
- rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS);
- if (overlappingCompressionEnabled && overlappingCompressionThreads > 0) {
+ int overlappingCompressionThreadsPerVcore =
+
rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
+ if (overlappingCompressionEnabled && overlappingCompressionThreadsPerVcore
> 0) {
+ int compressionThreads =
+ overlappingCompressionThreadsPerVcore *
sparkConf.getInt(EXECUTOR_CORES, 1);
this.dataPusher =
new OverlappingCompressionDataPusher(
shuffleWriteClient,
@@ -353,7 +356,7 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
failedTaskIds,
poolSize,
keepAliveTime,
- rssConf);
+ compressionThreads);
} else {
this.dataPusher =
new DataPusher(
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
index ced0fdfa7..ffa22771c 100644
---
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
@@ -26,7 +26,6 @@ import java.util.Set;
import java.util.concurrent.CompletableFuture;
import com.google.common.collect.Maps;
-import org.apache.spark.shuffle.RssSparkConfig;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -37,6 +36,7 @@ import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
import org.apache.uniffle.common.util.JavaUtils;
+import static
org.apache.spark.shuffle.RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class OverlappingCompressionDataPusherTest {
@@ -51,6 +51,7 @@ public class OverlappingCompressionDataPusherTest {
Set<String> failedTaskIds = new HashSet<>();
RssConf rssConf = new RssConf();
+ int threads =
rssConf.get(RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS_PER_VCORE);
// case1: Illegal thread number of compression
Assertions.assertThrows(
@@ -63,11 +64,10 @@ public class OverlappingCompressionDataPusherTest {
failedTaskIds,
1,
2,
- rssConf);
+ threads);
});
// case2: Propagated into the underlying data pusher
- rssConf.set(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS, 1);
DataPusher pusher =
new OverlappingCompressionDataPusher(
shuffleWriteClient,
@@ -76,7 +76,7 @@ public class OverlappingCompressionDataPusherTest {
failedTaskIds,
1,
2,
- rssConf);
+ 1);
pusher.setRssAppId("testSend");
String taskId = "taskId1";