[ 
https://issues.apache.org/jira/browse/HADOOP-19874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077601#comment-18077601
 ] 

ASF GitHub Bot commented on HADOOP-19874:
-----------------------------------------

aajisaka commented on code in PR #8461:
URL: https://github.com/apache/hadoop/pull/8461#discussion_r3172627100


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java:
##########
@@ -209,7 +229,16 @@ public int compress(byte[] b, int off, int len) throws 
IOException {
     compressedDirectBuf.position(0);
     compressedDirectBuf.limit(directBufferSize);
 
-    EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.FLUSH;
+    // CONTINUE should be used for non-end case, to support multi-threaded:
+    // 1. CONTINUE + workers ≥ 1: non-blocking. The call copies as much input
+    //      as it can into a job, dispatches to workers, drains whatever output
+    //      is ready, and returns. Multiple jobs can be in flight in parallel.
+    // 2. FLUSH + workers ≥ 1: multi-threaded compression will block to flush
+    //      as much output as possible. The call won't return until every 
queued
+    //      job has finished and its output has been drained to the dst buffer.
+    // 3. END + workers ≥ 1: same as FLUSH but also closes the frame. Same
+    //      blocking behavior.
+    EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.CONTINUE;

Review Comment:
   Thanks. I read https://facebook.github.io/zstd/zstd_manual.html. Make sense 
to use `CONTINUE` rather than `FLUSH`
   
   > note : multithreaded compression will block to flush as much output as 
possible.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java:
##########
@@ -74,12 +75,30 @@ public static int getRecommendedBufferSize() {
    * @param bufferSize bufferSize.
    */
   public ZStandardCompressor(int level, int bufferSize) {
-    this(level, bufferSize, bufferSize);
+    this(level,
+        CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT,
+        bufferSize, bufferSize);
+  }
+
+  /**
+   * Creates a new compressor with the supplied compression level and number
+   * of compression worker threads. Compressed data will be generated in
+   * ZStandard format.
+   *
+   * @param level the zstd compression level
+   * @param workers number of zstd compression worker threads (0 disables
+   *                multi-threaded compression)
+   * @param bufferSize the input/output direct buffer size
+   */
+  public ZStandardCompressor(int level, int workers, int bufferSize) {

Review Comment:
   `ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize) 
{` is not public. Safe to reuse





> ZStandardCodec supports multi-threaded compression
> --------------------------------------------------
>
>                 Key: HADOOP-19874
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19874
>             Project: Hadoop Common
>          Issue Type: Improvement
>            Reporter: Cheng Pan
>            Priority: Major
>              Labels: pull-request-available
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to