Copilot commented on code in PR #8399:
URL: https://github.com/apache/hadoop/pull/8399#discussion_r3038485665
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java:
##########
@@ -251,8 +249,8 @@ protected void finalize() {
}
Review Comment:
`finalize()` currently calls `reset()` rather than `end()`. With the
zstd-jni backend this risks leaking the native decompression context if callers
forget to call `end()`, because `reset()` does not release the underlying
native resources. If `finalize()` is retained, it should ensure the context is
closed (and ideally delegate to `super.finalize()` as appropriate);
alternatively, consider removing `finalize()` and relying on explicit
`end()`/try-with-resources patterns if that matches project conventions.
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java:
##########
@@ -262,35 +260,24 @@ private int populateUncompressedBuffer(byte[] b, int off,
int len, int n) {
return n;
}
- private native static void initIDs();
- private native static long create();
- private native static void init(long stream);
- private native int inflateBytesDirect(ByteBuffer src, int srcOffset,
- int srcLen, ByteBuffer dst, int dstOffset, int dstLen);
- private native static void free(long strm);
- private native static int getStreamSize();
-
- int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
- assert
- (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
-
- int originalPosition = dst.position();
- int n = inflateBytesDirect(
- src, src.position(), src.limit(), dst, dst.position(),
- dst.limit()
- );
- dst.position(originalPosition + n);
- if (bytesInCompressedBuffer > 0) {
- src.position(compressedDirectBufOff);
+ int inflateDirect(ByteBuffer src, ByteBuffer dst) {
+ assert (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
+
+ // zstd-jni: use streaming decompression directly on the provided buffers
+ int origDstPos = dst.position();
+ boolean done = zstdJniCtx.decompressDirectByteBufferStream(dst, src);
+ if (done) {
+ finished = true;
+ remaining = 0;
} else {
- src.position(src.limit());
+ remaining = src.limit() - src.position();
}
Review Comment:
`inflateDirect()` sets `finished=true` and `remaining=0` whenever `done` is
true. If the source buffer contains concatenated frames or trailing bytes
(i.e., `src.position() < src.limit()` after the call), this will misreport
`getRemaining()` and can incorrectly signal end-of-stream while input bytes are
still present. Align this logic with the non-direct `decompress()` path:
compute `remaining` from `src.limit() - src.position()` and only set `finished`
when `done` and `remaining==0` (or otherwise explicitly document/handle
concatenated frames).
##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java:
##########
@@ -210,30 +196,49 @@ public int compress(byte[] b, int off, int len) throws
IOException {
return n;
}
- // Re-initialize the output direct buffer
- compressedDirectBuf.rewind();
- compressedDirectBuf.limit(directBufferSize);
+ boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff
<= 0);
+ // Use END only when finish=true, no more user data, and all direct-buffer
+ // data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors
+ // ZSTD_compressStream + ZSTD_flushStream).
+ boolean shouldEnd = finish && userBufLen == 0 && allConsumed;
+ if (!allConsumed || shouldEnd) {
+ // Re-initialize the output direct buffer
Review Comment:
In `compress()`, when `allConsumed` is true and `shouldEnd` is false, the
method skips calling the zstd streaming API and returns 0. This can break
streaming output draining: zstd may still have pending compressed bytes to
flush even when no more input is available (especially with small output
buffers), and the old native implementation would continue flushing in this
state. Consider always invoking the zstd stream with `FLUSH` whenever there is
no remaining `compressedDirectBuf` output, even if there is currently no input,
and use the returned `done`/state to decide when it’s safe to report
`needsInput()` or produce 0 bytes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]