This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 47e5d17dd [#2362] feat(client): Add support of zstd parallel
compression (#2363)
47e5d17dd is described below
commit 47e5d17dd72bb007c6da14716def1bd56d60ffba
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat Feb 8 15:05:22 2025 +0800
[#2362] feat(client): Add support of zstd parallel compression (#2363)
### What changes were proposed in this pull request?
Add support of zstd parallel compression
### Why are the changes needed?
for #2362
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests
---------
Co-authored-by: Junfan Zhang <[email protected]>
---
.../apache/uniffle/common/compression/Codec.java | 3 +--
.../uniffle/common/compression/ZstdCodec.java | 22 +++++++++++++++++++---
.../uniffle/common/config/RssClientConf.java | 7 +++++++
docs/client_guide/client_guide.md | 1 +
4 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index 72c69dc06..717a70caf 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -23,7 +23,6 @@ import java.util.Optional;
import org.apache.uniffle.common.config.RssConf;
import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
-import static
org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
public abstract class Codec {
@@ -33,7 +32,7 @@ public abstract class Codec {
case NONE:
return Optional.empty();
case ZSTD:
- return
Optional.of(ZstdCodec.getInstance(rssConf.get(ZSTD_COMPRESSION_LEVEL)));
+ return Optional.of(ZstdCodec.getInstance(rssConf));
case SNAPPY:
return Optional.of(SnappyCodec.getInstance());
case NOOP:
diff --git
a/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
b/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
index ded47935f..1c15b9e0b 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
@@ -20,22 +20,29 @@ package org.apache.uniffle.common.compression;
import java.nio.ByteBuffer;
import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdCompressCtx;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.exception.RssException;
+import static
org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+import static
org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_WORKER_NUMBER;
+
public class ZstdCodec extends Codec {
private static final Logger LOGGER =
LoggerFactory.getLogger(ZstdCodec.class);
private int compressionLevel;
+ private int workerNumber;
private static class LazyHolder {
static final ZstdCodec INSTANCE = new ZstdCodec();
}
- public static ZstdCodec getInstance(int level) {
- LazyHolder.INSTANCE.compressionLevel = level;
+ public static ZstdCodec getInstance(RssConf conf) {
+ LazyHolder.INSTANCE.compressionLevel = conf.get(ZSTD_COMPRESSION_LEVEL);
+ LazyHolder.INSTANCE.workerNumber =
conf.get(ZSTD_COMPRESSION_WORKER_NUMBER);
return LazyHolder.INSTANCE;
}
@@ -69,7 +76,16 @@ public class ZstdCodec extends Codec {
@Override
public byte[] compress(byte[] src) {
- return Zstd.compress(src, compressionLevel);
+ ZstdCompressCtx ctx = new ZstdCompressCtx();
+ try {
+ ctx.setLevel(compressionLevel);
+ if (workerNumber > 0) {
+ ctx.setWorkers(workerNumber);
+ }
+ return ctx.compress(src);
+ } finally {
+ ctx.close();
+ }
}
@Override
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 63b4666d1..c366f1203 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -52,6 +52,13 @@ public class RssClientConf {
.defaultValue(3)
.withDescription("The zstd compression level, the default level is
3");
+ public static final ConfigOption<Integer> ZSTD_COMPRESSION_WORKER_NUMBER =
+ ConfigOptions.key("rss.client.io.compression.zstd.workerNumber")
+ .intType()
+ .defaultValue(-1)
+ .withDescription(
+ "Set the parallel compression worker number. This will not
enabled by default");
+
public static final ConfigOption<ShuffleDataDistributionType>
DATA_DISTRIBUTION_TYPE =
ConfigOptions.key("rss.client.shuffle.data.distribution.type")
.enumType(ShuffleDataDistributionType.class)
diff --git a/docs/client_guide/client_guide.md
b/docs/client_guide/client_guide.md
index 4092df454..c926deed1 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -49,6 +49,7 @@ The important configuration of client is listed as following.
These configuratio
| <client_type>.rss.client.assignment.shuffle.nodes.max | -1
| The number of required assignment shuffle
servers. If it is less than 0 or equals to 0 or greater than the coordinator's
config of "rss.coordinator.shuffle.nodes.max", it will use the size of
"rss.coordinator.shuffle.nodes.max" default
[...]
| <client_type>.rss.client.io.compression.codec | lz4
| The compression codec is used to compress the
shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`.
[...]
| <client_type>.rss.client.io.compression.zstd.level | 3
| The zstd compression level, the default level is
3
[...]
+| <client_type>.rss.client.io.compression.zstd.workerNumber | -1
| Set zstd parallel compression worker number. This
will not enabled by default
[...]
| <client_type>.rss.client.shuffle.data.distribution.type | NORMAL
| The type of partition shuffle data distribution,
including normal and local_order. The default value is normal. Now this config
is only valid in Spark3.x
[...]
| <client_type>.rss.estimate.task.concurrency.dynamic.factor | 1.0
| Between 0 and 1, used to estimate task
concurrency, when the client is spark, it represents how likely is this part of
the resource between spark.dynamicAllocation.minExecutors and
spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it
represents how likely the resources of map and reduce are satisfied. Effective
when <client_type>.rss.estimate.server.assi [...]
| <client_type>.rss.estimate.server.assignment.enabled | false
| Support mr and spark, whether to enable
estimation of the number of ShuffleServers that need to be allocated based on
the number of concurrent tasks.
[...]