This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new f1323ab95 [#2536] feat(spark): Controllable concurrency for
overlapping compression (#2541)
f1323ab95 is described below
commit f1323ab95034ef53e75e77283f110e096f7d6b6d
Author: Junfan Zhang <[email protected]>
AuthorDate: Thu Jul 10 11:05:47 2025 +0800
[#2536] feat(spark): Controllable concurrency for overlapping compression
(#2541)
### What changes were proposed in this pull request?
Introduce extra the option to control the concurrency of overlapping
compression.
### Why are the changes needed?
for #2536 .
Decoupling compression concurrency from the transfer pool is essential for
resource isolation and performance stability, especially under multi-tenant or
CPU-constrained environments.
### Does this PR introduce _any_ user-facing change?
Yes.
`rss.client.write.overlappingCompressionThreads=-1`.
About overlapping compression related config options, I will add it into
doc in another PR
### How was this patch tested?
Unit tests.
---
.../org/apache/spark/shuffle/RssSparkConfig.java | 7 ++
.../writer/OverlappingCompressionDataPusher.java | 89 ++++++++++++++++++++
.../shuffle/manager/RssShuffleManagerBase.java | 35 ++++++--
.../OverlappingCompressionDataPusherTest.java | 94 ++++++++++++++++++++++
4 files changed, 217 insertions(+), 8 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
index 66b964ac9..4bffd227d 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java
@@ -45,6 +45,13 @@ public class RssSparkConfig {
.defaultValue(false)
.withDescription("Whether to overlapping compress shuffle blocks.");
+ public static final ConfigOption<Integer>
RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS =
+ ConfigOptions.key("rss.client.write.overlappingCompressionThreads")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "The number of threads to overlapping compress shuffle blocks.
If <= 0, this will be disabled.");
+
public static final ConfigOption<Boolean>
RSS_READ_REORDER_MULTI_SERVERS_ENABLED =
ConfigOptions.key("rss.client.read.reorderMultiServersEnable")
.booleanType()
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
new file mode 100644
index 000000000..c6e939421
--- /dev/null
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusher.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.writer;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.client.api.ShuffleWriteClient;
+import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.ThreadUtils;
+
+/**
+ * The extension of {@link DataPusher} is used only when the overlapping
compression is activated.
+ */
+public class OverlappingCompressionDataPusher extends DataPusher {
+ private static final Logger LOG =
LoggerFactory.getLogger(OverlappingCompressionDataPusher.class);
+
+ private final ExecutorService compressionThreadPool;
+
+ public OverlappingCompressionDataPusher(
+ ShuffleWriteClient shuffleWriteClient,
+ Map<String, Set<Long>> taskToSuccessBlockIds,
+ Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker,
+ Set<String> failedTaskIds,
+ int threadPoolSize,
+ int threadKeepAliveTime,
+ RssConf rssConf) {
+ super(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ threadPoolSize,
+ threadKeepAliveTime);
+
+ int compressionThreads =
+
rssConf.getInteger(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS);
+ if (compressionThreads <= 0) {
+ throw new RssException(
+ "Invalid rss configuration of "
+ + RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS.key()
+ + ": "
+ + compressionThreads);
+ }
+ this.compressionThreadPool =
+ Executors.newFixedThreadPool(
+ compressionThreads,
ThreadUtils.getThreadFactory("compression-thread"));
+ }
+
+ @Override
+ public CompletableFuture<Long> send(AddBlockEvent event) {
+ // Step 1: process event data in a separate thread (e.g., trigger
compression)
+ return CompletableFuture.supplyAsync(
+ () -> {
+ event.getShuffleDataInfoList().forEach(shuffleDataInfo ->
shuffleDataInfo.getData());
+ return event;
+ },
+ compressionThreadPool)
+ .thenCompose(
+ processedEvent -> {
+ // Step 2: forward to the parent class's send method
+ return super.send(processedEvent);
+ });
+ }
+}
diff --git
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
index e9085cfa2..dee4c2ef7 100644
---
a/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
+++
b/client-spark/common/src/main/java/org/apache/uniffle/shuffle/manager/RssShuffleManagerBase.java
@@ -66,6 +66,7 @@ import
org.apache.spark.shuffle.handle.SimpleShuffleHandleInfo;
import org.apache.spark.shuffle.handle.StageAttemptShuffleHandleInfo;
import org.apache.spark.shuffle.writer.AddBlockEvent;
import org.apache.spark.shuffle.writer.DataPusher;
+import org.apache.spark.shuffle.writer.OverlappingCompressionDataPusher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -338,14 +339,32 @@ public abstract class RssShuffleManagerBase implements
RssShuffleManagerInterfac
LOG.info("Rss data pusher is starting...");
int poolSize =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_SIZE);
int keepAliveTime =
sparkConf.get(RssSparkConfig.RSS_CLIENT_SEND_THREAD_POOL_KEEPALIVE);
- this.dataPusher =
- new DataPusher(
- shuffleWriteClient,
- taskToSuccessBlockIds,
- taskToFailedBlockSendTracker,
- failedTaskIds,
- poolSize,
- keepAliveTime);
+
+ boolean overlappingCompressionEnabled =
+ rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_ENABLED);
+ int overlappingCompressionThreads =
+ rssConf.get(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS);
+ if (overlappingCompressionEnabled && overlappingCompressionThreads > 0) {
+ this.dataPusher =
+ new OverlappingCompressionDataPusher(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ poolSize,
+ keepAliveTime,
+ rssConf);
+ } else {
+ this.dataPusher =
+ new DataPusher(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ poolSize,
+ keepAliveTime);
+ }
+
this.partitionReassignMaxServerNum =
rssConf.get(RSS_PARTITION_REASSIGN_MAX_REASSIGNMENT_SERVER_NUM);
this.shuffleHandleInfoManager = new ShuffleHandleInfoManager();
diff --git
a/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
new file mode 100644
index 000000000..ced0fdfa7
--- /dev/null
+++
b/client-spark/common/src/test/java/org/apache/spark/shuffle/writer/OverlappingCompressionDataPusherTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.writer;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+import com.google.common.collect.Maps;
+import org.apache.spark.shuffle.RssSparkConfig;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import org.apache.uniffle.client.impl.FailedBlockSendTracker;
+import org.apache.uniffle.common.ShuffleBlockInfo;
+import org.apache.uniffle.common.ShuffleServerInfo;
+import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.common.util.JavaUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class OverlappingCompressionDataPusherTest {
+
+ @Test
+ public void testSend() {
+ DataPusherTest.FakedShuffleWriteClient shuffleWriteClient =
+ new DataPusherTest.FakedShuffleWriteClient();
+
+ Map<String, Set<Long>> taskToSuccessBlockIds = Maps.newConcurrentMap();
+ Map<String, FailedBlockSendTracker> taskToFailedBlockSendTracker =
JavaUtils.newConcurrentMap();
+ Set<String> failedTaskIds = new HashSet<>();
+
+ RssConf rssConf = new RssConf();
+
+ // case1: Illegal thread number of compression
+ Assertions.assertThrows(
+ RssException.class,
+ () -> {
+ new OverlappingCompressionDataPusher(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ 1,
+ 2,
+ rssConf);
+ });
+
+ // case2: Propagated into the underlying data pusher
+ rssConf.set(RssSparkConfig.RSS_WRITE_OVERLAPPING_COMPRESSION_THREADS, 1);
+ DataPusher pusher =
+ new OverlappingCompressionDataPusher(
+ shuffleWriteClient,
+ taskToSuccessBlockIds,
+ taskToFailedBlockSendTracker,
+ failedTaskIds,
+ 1,
+ 2,
+ rssConf);
+ pusher.setRssAppId("testSend");
+
+ String taskId = "taskId1";
+ List<ShuffleServerInfo> server1 =
+ Collections.singletonList(new ShuffleServerInfo("0", "localhost",
1234));
+ ShuffleBlockInfo staleBlock1 =
+ new ShuffleBlockInfo(
+ 1, 1, 3, 1, 1, new byte[1], server1, 1, 100, 1, integer ->
Collections.emptyList());
+
+ // case1: will fast fail due to the stale assignment
+ AddBlockEvent event = new AddBlockEvent(taskId,
Arrays.asList(staleBlock1));
+ CompletableFuture<Long> f1 = pusher.send(event);
+ assertEquals(f1.join(), 0);
+ }
+}