Copilot commented on code in PR #8399:
URL: https://github.com/apache/hadoop/pull/8399#discussion_r3038485665


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java:
##########
@@ -251,8 +249,8 @@ protected void finalize() {
   }

Review Comment:
   `finalize()` currently calls `reset()` rather than `end()`. With the 
zstd-jni backend this risks leaking the native decompression context if callers 
forget to call `end()`, because `reset()` does not release the underlying 
native resources. If `finalize()` is retained, it should ensure the context is 
closed (and ideally delegate to `super.finalize()` as appropriate); 
alternatively, consider removing `finalize()` and relying on explicit 
`end()`/try-with-resources patterns if that matches project conventions.



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardDecompressor.java:
##########
@@ -262,35 +260,24 @@ private int populateUncompressedBuffer(byte[] b, int off, 
int len, int n) {
     return n;
   }
 
-  private native static void initIDs();
-  private native static long create();
-  private native static void init(long stream);
-  private native int inflateBytesDirect(ByteBuffer src, int srcOffset,
-      int srcLen, ByteBuffer dst, int dstOffset, int dstLen);
-  private native static void free(long strm);
-  private native static int getStreamSize();
-
-  int inflateDirect(ByteBuffer src, ByteBuffer dst) throws IOException {
-    assert
-        (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
-
-    int originalPosition = dst.position();
-    int n = inflateBytesDirect(
-        src, src.position(), src.limit(), dst, dst.position(),
-        dst.limit()
-    );
-    dst.position(originalPosition + n);
-    if (bytesInCompressedBuffer > 0) {
-      src.position(compressedDirectBufOff);
+  int inflateDirect(ByteBuffer src, ByteBuffer dst) {
+    assert (this instanceof ZStandardDecompressor.ZStandardDirectDecompressor);
+
+    // zstd-jni: use streaming decompression directly on the provided buffers
+    int origDstPos = dst.position();
+    boolean done = zstdJniCtx.decompressDirectByteBufferStream(dst, src);
+    if (done) {
+      finished = true;
+      remaining = 0;
     } else {
-      src.position(src.limit());
+      remaining = src.limit() - src.position();
     }

Review Comment:
   `inflateDirect()` sets `finished=true` and `remaining=0` whenever `done` is 
true. If the source buffer contains concatenated frames or trailing bytes 
(i.e., `src.position() < src.limit()` after the call), this will misreport 
`getRemaining()` and can incorrectly signal end-of-stream while input bytes are 
still present. Align this logic with the non-direct `decompress()` path: 
compute `remaining` from `src.limit() - src.position()` and only set `finished` 
when `done` and `remaining==0` (or otherwise explicitly document/handle 
concatenated frames).



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/zstd/ZStandardCompressor.java:
##########
@@ -210,30 +196,49 @@ public int compress(byte[] b, int off, int len) throws 
IOException {
       return n;
     }
 
-    // Re-initialize the output direct buffer
-    compressedDirectBuf.rewind();
-    compressedDirectBuf.limit(directBufferSize);
+    boolean allConsumed = (uncompressedDirectBufLen - uncompressedDirectBufOff 
<= 0);
+    // Use END only when finish=true, no more user data, and all direct-buffer
+    // data consumed (mirrors ZSTD_endStream); otherwise FLUSH (mirrors
+    // ZSTD_compressStream + ZSTD_flushStream).
+    boolean shouldEnd = finish && userBufLen == 0 && allConsumed;
+    if (!allConsumed || shouldEnd) {
+      // Re-initialize the output direct buffer

Review Comment:
   In `compress()`, when `allConsumed` is true and `shouldEnd` is false, the 
method skips calling the zstd streaming API and returns 0. This can break 
streaming output draining: zstd may still have pending compressed bytes to 
flush even when no more input is available (especially with small output 
buffers), and the old native implementation would continue flushing in this 
state. Consider always invoking the zstd stream with `FLUSH` whenever there is 
no remaining `compressedDirectBuf` output, even if there is currently no input, 
and use the returned `done`/state to decide when it’s safe to report 
`needsInput()` or produce 0 bytes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to