[
https://issues.apache.org/jira/browse/HADOOP-19855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18071366#comment-18071366
]
ASF GitHub Bot commented on HADOOP-19855:
-----------------------------------------
Copilot commented on code in PR #8399:
URL: https://github.com/apache/hadoop/pull/8399#discussion_r3038485665
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java:
##########
@@ -251,8 +249,8 @@ protected void finalize() {
}
Review Comment:
`finalize()` currently calls `reset()` rather than `end()`. With the
zstd-jni backend this risks leaking the native decompression context if callers
forget to call `end()`, because `reset()` does not release the underlying
native resources. If `finalize()` is retained, it should ensure the context is
closed (and ideally delegate to `super.finalize()` as appropriate);
alternatively, consider removing `finalize()` and relying on explicit
`end()`/try-with-resources patterns if that matches project conventions.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java:
##########
@@ -262,35 +260,24 @@ private int populateUncompressedBuffer(byte[] b, int off,
int len, int n) {
return n;
}
- private native static void initIDs();
- private native static long create();
- private native static void init(long stream);
- private native int inflateBytesDirect(ByteBuffer src, int srcOffset,
- int srcLen, ByteBuffer dst, int dstOffset, int dstLen);
- private native static void free(long strm);
- private native static int getStreamSize();
-
- int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
- assert
- (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
-
- int originalPosition = dst.position();
- int n = inflateBytesDirect(
- src, src.position(), src.limit(), dst, dst.position(),
- dst.limit()
- );
- dst.position(originalPosition + n);
- if (bytesInCompressedBuffer > 0) {
- src.position(compressedDirectBufOff);
+ int inflateDirect(ByteBuffer src, ByteBuffer dst) {
+ assert (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
+
+ // zstd-jni: use streaming decompression directly on the provided buffers
+ int origDstPos = dst.position();
+ boolean done = zstdJniCtx.decompressDirectByteBufferStream(dst, src);
+ if (done) {
+ finished = true;
+ remaining = 0;
} else {
- src.position(src.limit());
+ remaining = src.limit() - src.position();
}
Review Comment:
`inflateDirect()` sets `finished=true` and `remaining=0` whenever `done` is
true. If the source buffer contains concatenated frames or trailing bytes
(i.e., `src.position() < src.limit()` after the call), this will misreport
`getRemaining()` and can incorrectly signal end-of-stream while input bytes are
still present. Align this logic with the non-direct `decompress()` path:
compute `remaining` from `src.limit() - src.position()` and only set `finished`
when `done` and `remaining==0` (or otherwise explicitly document/handle
concatenated frames).
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java:
##########
@@ -210,30 +196,49 @@ public int compress(byte[] b, int off, int len) throws
IOException {
return n;
}
- // Re-initialize the output direct buffer
- compressedDirectBuf.rewind();
- compressedDirectBuf.limit(directBufferSize);
+ boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff
<= 0);
+ // Use END only when finish=true, no more user data, and all direct-buffer
+ // data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors
+ // ZSTD_compressStream + ZSTD_flushStream).
+ boolean shouldEnd = finish && userBufLen == 0 && allConsumed;
+ if (!allConsumed || shouldEnd) {
+ // Re-initialize the output direct buffer
Review Comment:
In `compress()`, when `allConsumed` is true and `shouldEnd` is false, the
method skips calling the zstd streaming API and returns 0. This can break
streaming output draining: zstd may still have pending compressed bytes to
flush even when no more input is available (especially with small output
buffers), and the old native implementation would continue flushing in this
state. Consider always invoking the zstd stream with `FLUSH` whenever there is
no remaining `compressedDirectBuf` output, even if there is currently no input,
and use the returned `done`/state to decide when it’s safe to report
`needsInput()` or produce 0 bytes.
> Use zstd-jni in ZStandardCodec
> ------------------------------
>
> Key: HADOOP-19855
> URL: https://issues.apache.org/jira/browse/HADOOP-19855
> Project: Hadoop Common
> Issue Type: Improvement
> Components: compress
> Affects Versions: 3.5.0
> Reporter: Cheng Pan
> Priority: Major
> Labels: pull-request-available
>
> In Hadoop, we use native libs for zstd codec which has several disadvantages:
> * It requires native *libhadoop* and *libzstd* to be installed in system
> {*}LD_LIBRARY_PATH{*}, and they have to be installed separately on each node
> of the clusters, container images, or local test environments which adds huge
> complexities from deployment point of view. In some environments, it requires
> compiling the natives from sources which is non-trivial. Also, this approach
> is platform dependent; the binary may not work in different platform, so it
> requires recompilation.
> * It requires extra configuration of *java.library.path* to load the
> natives, and it results higher application deployment and maintenance cost
> for users.
> Projects such as *Spark* and *Parquet* use
> [zstd-jni|https://github.com/luben/zstd-jni] which is JNI-based
> implementation. It contains native binaries for Linux, Mac, and IBM in jar
> file, and it can automatically load the native binaries into JVM from jar
> without any setup.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]