[ 
https://issues.apache.org/jira/browse/HADOOP-19855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18071366#comment-18071366
 ] 

ASF GitHub Bot commented on HADOOP-19855:
-----------------------------------------

Copilot commented on code in PR #8399:
URL: https://github.com/apache/hadoop/pull/8399#discussion_r3038485665


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java:
##########
@@ -251,8 +249,8 @@ protected void finalize() {
   }

Review Comment:
   `finalize()` currently calls `reset()` rather than `end()`. With the 
zstd-jni backend this risks leaking the native decompression context if callers 
forget to call `end()`, because `reset()` does not release the underlying 
native resources. If `finalize()` is retained, it should ensure the context is 
closed (and ideally delegate to `super.finalize()` as appropriate); 
alternatively, consider removing `finalize()` and relying on explicit 
`end()`/try-with-resources patterns if that matches project conventions.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java:
##########
@@ -262,35 +260,24 @@ private int populateUncompressedBuffer(byte[] b, int off, 
int len, int n) {
     return n;
   }
 
-  private native static void initIDs();
-  private native static long create();
-  private native static void init(long stream);
-  private native int inflateBytesDirect(ByteBuffer src, int srcOffset,
-      int srcLen, ByteBuffer dst, int dstOffset, int dstLen);
-  private native static void free(long strm);
-  private native static int getStreamSize();
-
-  int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
-    assert
-        (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
-
-    int originalPosition = dst.position();
-    int n = inflateBytesDirect(
-        src, src.position(), src.limit(), dst, dst.position(),
-        dst.limit()
-    );
-    dst.position(originalPosition + n);
-    if (bytesInCompressedBuffer > 0) {
-      src.position(compressedDirectBufOff);
+  int inflateDirect(ByteBuffer src, ByteBuffer dst) {
+    assert (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
+
+    // zstd-jni: use streaming decompression directly on the provided buffers
+    int origDstPos = dst.position();
+    boolean done = zstdJniCtx.decompressDirectByteBufferStream(dst, src);
+    if (done) {
+      finished = true;
+      remaining = 0;
     } else {
-      src.position(src.limit());
+      remaining = src.limit() - src.position();
     }

Review Comment:
   `inflateDirect()` sets `finished=true` and `remaining=0` whenever `done` is 
true. If the source buffer contains concatenated frames or trailing bytes 
(i.e., `src.position() < src.limit()` after the call), this will misreport 
`getRemaining()` and can incorrectly signal end-of-stream while input bytes are 
still present. Align this logic with the non-direct `decompress()` path: 
compute `remaining` from `src.limit() - src.position()` and only set `finished` 
when `done` and `remaining==0` (or otherwise explicitly document/handle 
concatenated frames).



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java:
##########
@@ -210,30 +196,49 @@ public int compress(byte[] b, int off, int len) throws 
IOException {
       return n;
     }
 
-    // Re-initialize the output direct buffer
-    compressedDirectBuf.rewind();
-    compressedDirectBuf.limit(directBufferSize);
+    boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff 
<= 0);
+    // Use END only when finish=true, no more user data, and all direct-buffer
+    // data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors
+    // ZSTD_compressStream + ZSTD_flushStream).
+    boolean shouldEnd = finish && userBufLen == 0 && allConsumed;
+    if (!allConsumed || shouldEnd) {
+      // Re-initialize the output direct buffer

Review Comment:
   In `compress()`, when `allConsumed` is true and `shouldEnd` is false, the 
method skips calling the zstd streaming API and returns 0. This can break 
streaming output draining: zstd may still have pending compressed bytes to 
flush even when no more input is available (especially with small output 
buffers), and the old native implementation would continue flushing in this 
state. Consider always invoking the zstd stream with `FLUSH` whenever there is 
no remaining `compressedDirectBuf` output, even if there is currently no input, 
and use the returned `done`/state to decide when it’s safe to report 
`needsInput()` or produce 0 bytes.





> Use zstd-jni in ZStandardCodec
> ------------------------------
>
>                 Key: HADOOP-19855
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19855
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: compress
>    Affects Versions: 3.5.0
>            Reporter: Cheng Pan
>            Priority: Major
>              Labels: pull-request-available
>
> In Hadoop, we use native libs for zstd codec which has several disadvantages:
>  * It requires native *libhadoop* and *libzstd* to be installed in system 
> {*}LD_LIBRARY_PATH{*}, and they have to be installed separately on each node 
> of the clusters, container images, or local test environments which adds huge 
> complexities from deployment point of view. In some environments, it requires 
> compiling the natives from sources which is non-trivial. Also, this approach 
> is platform dependent; the binary may not work in different platform, so it 
> requires recompilation.
>  * It requires extra configuration of *java.library.path* to load the 
> natives, and it results higher application deployment and maintenance cost 
> for users.
> Projects such as *Spark* and *Parquet* use 
> [zstd-jni|https://github.com/luben/zstd-jni] which is JNI-based 
> implementation. It contains native binaries for Linux, Mac, and IBM in jar 
> file, and it can automatically load the native binaries into JVM from jar 
> without any setup.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to