[
https://issues.apache.org/jira/browse/HADOOP-19874?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18077601#comment-18077601
]
ASF GitHub Bot commented on HADOOP-19874:
-----------------------------------------
aajisaka commented on code in PR #8461:
URL: https://github.com/apache/hadoop/pull/8461#discussion_r3172627100
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java:
##########
@@ -209,7 +229,16 @@ public int compress(byte[] b, int off, int len) throws
IOException {
compressedDirectBuf.position(0);
compressedDirectBuf.limit(directBufferSize);
- EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.FLUSH;
+ // CONTINUE should be used for non-end case, to support multi-threaded:
+ // 1. CONTINUE + workers ≥ 1: non-blocking. The call copies as much input
+ // as it can into a job, dispatches to workers, drains whatever output
+ // is ready, and returns. Multiple jobs can be in flight in parallel.
+ // 2. FLUSH + workers ≥ 1: multi-threaded compression will block to flush
+ // as much output as possible. The call won't return until every
queued
+ // job has finished and its output has been drained to the dst buffer.
+ // 3. END + workers ≥ 1: same as FLUSH but also closes the frame. Same
+ // blocking behavior.
+ EndDirective endOp = shouldEnd ? EndDirective.END : EndDirective.CONTINUE;
Review Comment:
Thanks. I read https://facebook.github.io/zstd/zstd_manual.html. Make sense
to use `CONTINUE` rather than `FLUSH`
> note : multithreaded compression will block to flush as much output as
possible.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java:
##########
@@ -74,12 +75,30 @@ public static int getRecommendedBufferSize() {
* @param bufferSize bufferSize.
*/
public ZStandardCompressor(int level, int bufferSize) {
- this(level, bufferSize, bufferSize);
+ this(level,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_ZSTD_WORKERS_DEFAULT,
+ bufferSize, bufferSize);
+ }
+
+ /**
+ * Creates a new compressor with the supplied compression level and number
+ * of compression worker threads. Compressed data will be generated in
+ * ZStandard format.
+ *
+ * @param level the zstd compression level
+ * @param workers number of zstd compression worker threads (0 disables
+ * multi-threaded compression)
+ * @param bufferSize the input/output direct buffer size
+ */
+ public ZStandardCompressor(int level, int workers, int bufferSize) {
Review Comment:
`ZStandardCompressor(int level, int inputBufferSize, int outputBufferSize)
{` is not public. Safe to reuse
> ZStandardCodec supports multi-threaded compression
> --------------------------------------------------
>
> Key: HADOOP-19874
> URL: https://issues.apache.org/jira/browse/HADOOP-19874
> Project: Hadoop Common
> Issue Type: Improvement
> Reporter: Cheng Pan
> Priority: Major
> Labels: pull-request-available
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]