This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new a69930dee [#2506] feat(spark3): Introduce option to enable reorder
multi servers for reader (#2507)
a69930dee is described below
commit a69930deea99462380bdd528b8c0e49925dd1978
Author: Junfan Zhang <[email protected]>
AuthorDate: Wed Jun 18 17:19:31 2025 +0800
[#2506] feat(spark3): Introduce option to enable reorder multi servers for
reader (#2507)
### What changes were proposed in this pull request?
Introduce option to enable reorder multi servers for reader
### Why are the changes needed?
If partition splitting is enabled, large partitions will be distributed
across multiple shuffle servers. With the help of Spark AQE (Adaptive Query
Execution), these large partitions will be processed by multiple tasks.
In this case, all split tasks may sequentially read from the same set of
shuffle servers, which can lead to high RPC concurrency pressure on specific
servers.
This PR introduces the ability to randomly reorder the underlying shuffle
servers to achieve better load balancing during reading.
### Does this PR introduce _any_ user-facing change?
Yes.
`rss.client.read.reorderMultiServersEnable=false
`
### How was this patch tested?
Needn't
---
.../src/main/java/org/apache/spark/shuffle/RssSparkConfig.java | 8 ++++++++
.../java/org/apache/spark/shuffle/reader/RssShuffleReader.java | 7 +++++++
2 files changed, 15 insertions(+)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 4a7f653db..79baab59c 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -39,6 +39,14 @@ import org.apache.uniffle.common.config.RssConf;
public class RssSparkConfig {
+ public static final ConfigOption<Boolean>
RSS_READ_REORDER_MULTI_SERVERS_ENABLED =
+ ConfigOptions.key("rss.client.read.reorderMultiServersEnable")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription(
+ "If multiple replicated or load-balanced shuffle servers are
assigned for one partition, "
+ + "this option can be enabled to perform load-balanced reads
and avoid hot spots.");
+
public static final ConfigOption<Boolean> RSS_RESUBMIT_STAGE_ENABLED =
ConfigOptions.key("rss.stageRetry.enabled")
.booleanType()
diff --git
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
index a1c345e84..2a572c3ec 100644
---
a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
+++
b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/reader/RssShuffleReader.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.reader;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -64,6 +65,7 @@ import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.storage.handler.impl.ShuffleServerReadCostTracker;
+import static
org.apache.spark.shuffle.RssSparkConfig.RSS_READ_REORDER_MULTI_SERVERS_ENABLED;
import static
org.apache.spark.shuffle.RssSparkConfig.RSS_RESUBMIT_STAGE_WITH_FETCH_FAILURE_ENABLED;
public class RssShuffleReader<K, C> implements ShuffleReader<K, C> {
@@ -256,6 +258,11 @@ public class RssShuffleReader<K, C> implements
ShuffleReader<K, C> {
continue;
}
List<ShuffleServerInfo> shuffleServerInfoList =
partitionToShuffleServers.get(partition);
+ if (shuffleServerInfoList != null
+ && shuffleServerInfoList.size() > 1
+ && rssConf.getBoolean(RSS_READ_REORDER_MULTI_SERVERS_ENABLED)) {
+ Collections.shuffle(shuffleServerInfoList);
+ }
// This mechanism of expectedTaskIdsBitmap filter is to filter out the
most of data.
// especially for AQE skew optimization
boolean expectedTaskIdsBitmapFilterEnable =