This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 47e5d17dd [#2362] feat(client): Add support of zstd parallel 
compression (#2363)
47e5d17dd is described below

commit 47e5d17dd72bb007c6da14716def1bd56d60ffba
Author: Junfan Zhang <[email protected]>
AuthorDate: Sat Feb 8 15:05:22 2025 +0800

    [#2362] feat(client): Add support of zstd parallel compression (#2363)
    
    ### What changes were proposed in this pull request?
    
    Add support of zstd parallel compression
    
    ### Why are the changes needed?
    
    for #2362
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests
    
    ---------
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../apache/uniffle/common/compression/Codec.java   |  3 +--
 .../uniffle/common/compression/ZstdCodec.java      | 22 +++++++++++++++++++---
 .../uniffle/common/config/RssClientConf.java       |  7 +++++++
 docs/client_guide/client_guide.md                  |  1 +
 4 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index 72c69dc06..717a70caf 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -23,7 +23,6 @@ import java.util.Optional;
 import org.apache.uniffle.common.config.RssConf;
 
 import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
-import static 
org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
 
 public abstract class Codec {
 
@@ -33,7 +32,7 @@ public abstract class Codec {
       case NONE:
         return Optional.empty();
       case ZSTD:
-        return 
Optional.of(ZstdCodec.getInstance(rssConf.get(ZSTD_COMPRESSION_LEVEL)));
+        return Optional.of(ZstdCodec.getInstance(rssConf));
       case SNAPPY:
         return Optional.of(SnappyCodec.getInstance());
       case NOOP:
diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
index ded47935f..1c15b9e0b 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java
@@ -20,22 +20,29 @@ package org.apache.uniffle.common.compression;
 import java.nio.ByteBuffer;
 
 import com.github.luben.zstd.Zstd;
+import com.github.luben.zstd.ZstdCompressCtx;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.uniffle.common.config.RssConf;
 import org.apache.uniffle.common.exception.RssException;
 
+import static 
org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+import static 
org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_WORKER_NUMBER;
+
 public class ZstdCodec extends Codec {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ZstdCodec.class);
 
   private int compressionLevel;
+  private int workerNumber;
 
   private static class LazyHolder {
     static final ZstdCodec INSTANCE = new ZstdCodec();
   }
 
-  public static ZstdCodec getInstance(int level) {
-    LazyHolder.INSTANCE.compressionLevel = level;
+  public static ZstdCodec getInstance(RssConf conf) {
+    LazyHolder.INSTANCE.compressionLevel = conf.get(ZSTD_COMPRESSION_LEVEL);
+    LazyHolder.INSTANCE.workerNumber = 
conf.get(ZSTD_COMPRESSION_WORKER_NUMBER);
     return LazyHolder.INSTANCE;
   }
 
@@ -69,7 +76,16 @@ public class ZstdCodec extends Codec {
 
   @Override
   public byte[] compress(byte[] src) {
-    return Zstd.compress(src, compressionLevel);
+    ZstdCompressCtx ctx = new ZstdCompressCtx();
+    try {
+      ctx.setLevel(compressionLevel);
+      if (workerNumber > 0) {
+        ctx.setWorkers(workerNumber);
+      }
+      return ctx.compress(src);
+    } finally {
+      ctx.close();
+    }
   }
 
   @Override
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java 
b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index 63b4666d1..c366f1203 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -52,6 +52,13 @@ public class RssClientConf {
           .defaultValue(3)
           .withDescription("The zstd compression level, the default level is 
3");
 
+  public static final ConfigOption<Integer> ZSTD_COMPRESSION_WORKER_NUMBER =
+      ConfigOptions.key("rss.client.io.compression.zstd.workerNumber")
+          .intType()
+          .defaultValue(-1)
+          .withDescription(
+              "Set the parallel compression worker number. This will not 
enabled by default");
+
   public static final ConfigOption<ShuffleDataDistributionType> 
DATA_DISTRIBUTION_TYPE =
       ConfigOptions.key("rss.client.shuffle.data.distribution.type")
           .enumType(ShuffleDataDistributionType.class)
diff --git a/docs/client_guide/client_guide.md 
b/docs/client_guide/client_guide.md
index 4092df454..c926deed1 100644
--- a/docs/client_guide/client_guide.md
+++ b/docs/client_guide/client_guide.md
@@ -49,6 +49,7 @@ The important configuration of client is listed as following. 
These configuratio
 | <client_type>.rss.client.assignment.shuffle.nodes.max           | -1         
                            | The number of required assignment shuffle 
servers. If it is less than 0 or equals to 0 or greater than the coordinator's 
config of "rss.coordinator.shuffle.nodes.max", it will use the size of 
"rss.coordinator.shuffle.nodes.max" default                                     
                                                                                
                                [...]
 | <client_type>.rss.client.io.compression.codec                   | lz4        
                            | The compression codec is used to compress the 
shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`.     
                                                                                
                                                                                
                                                                                
                  [...]
 | <client_type>.rss.client.io.compression.zstd.level              | 3          
                            | The zstd compression level, the default level is 
3                                                                               
                                                                                
                                                                                
                                                                                
               [...]
+| <client_type>.rss.client.io.compression.zstd.workerNumber       | -1         
                            | Set zstd parallel compression worker number. This 
will not enabled by default                                                     
                                                                                
                                                                                
                                                                                
              [...]
 | <client_type>.rss.client.shuffle.data.distribution.type         | NORMAL     
                            | The type of partition shuffle data distribution, 
including normal and local_order. The default value is normal. Now this config 
is only valid in Spark3.x                                                       
                                                                                
                                                                                
                [...]
 | <client_type>.rss.estimate.task.concurrency.dynamic.factor      | 1.0        
                            | Between 0 and 1, used to estimate task 
concurrency, when the client is spark, it represents how likely is this part of 
the resource between spark.dynamicAllocation.minExecutors and 
spark.dynamicAllocation.maxExecutors to be allocated, when the client is mr, it 
represents how likely the resources of map and reduce are satisfied. Effective 
when <client_type>.rss.estimate.server.assi [...]
 | <client_type>.rss.estimate.server.assignment.enabled            | false      
                            | Support mr and spark, whether to enable 
estimation of the number of ShuffleServers that need to be allocated based on 
the number of concurrent tasks.                                                 
                                                                                
                                                                                
                          [...]

Reply via email to