This is an automated email from the ASF dual-hosted git repository.

ndimiduk pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new fe6d1a0afb3 HBASE-29193: Allow ZstdByteBuffDecompressor to take direct 
ByteBuffer as input and heap ByteBuffer as output, or vice versa (#6806)
fe6d1a0afb3 is described below

commit fe6d1a0afb3f4824cc0ecc7224f96187cf79a44d
Author: Charles Connell <[email protected]>
AuthorDate: Thu Apr 17 04:07:35 2025 -0400

    HBASE-29193: Allow ZstdByteBuffDecompressor to take direct ByteBuffer as 
input and heap ByteBuffer as output, or vice versa (#6806)
    
    Signed-off-by: Nick Dimiduk <[email protected]>
---
 .../io/compress/zstd/ZstdByteBuffDecompressor.java | 67 +++++++++-------------
 .../zstd/TestZstdByteBuffDecompressor.java         | 32 +++++++++--
 pom.xml                                            |  2 +-
 3 files changed, 55 insertions(+), 46 deletions(-)

diff --git 
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
 
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
index ec5315aa4c0..d71d46e2946 100644
--- 
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
+++ 
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
@@ -55,20 +55,8 @@ public class ZstdByteBuffDecompressor implements 
ByteBuffDecompressor, CanReinit
 
   @Override
   public boolean canDecompress(ByteBuff output, ByteBuff input) {
-    if (!allowByteBuffDecompression) {
-      return false;
-    }
-    if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) {
-      ByteBuffer nioOutput = output.nioByteBuffers()[0];
-      ByteBuffer nioInput = input.nioByteBuffers()[0];
-      if (nioOutput.isDirect() && nioInput.isDirect()) {
-        return true;
-      } else if (!nioOutput.isDirect() && !nioInput.isDirect()) {
-        return true;
-      }
-    }
-
-    return false;
+    return allowByteBuffDecompression && output instanceof SingleByteBuff
+      && input instanceof SingleByteBuff;
   }
 
   @Override
@@ -80,38 +68,35 @@ public class ZstdByteBuffDecompressor implements 
ByteBuffDecompressor, CanReinit
     if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) {
       ByteBuffer nioOutput = output.nioByteBuffers()[0];
       ByteBuffer nioInput = input.nioByteBuffers()[0];
+      int origOutputPos = nioOutput.position();
+      int n;
       if (nioOutput.isDirect() && nioInput.isDirect()) {
-        return decompressDirectByteBuffers(nioOutput, nioInput, inputLen);
+        n = ctx.decompressDirectByteBuffer(nioOutput, nioOutput.position(),
+          nioOutput.limit() - nioOutput.position(), nioInput, 
nioInput.position(), inputLen);
       } else if (!nioOutput.isDirect() && !nioInput.isDirect()) {
-        return decompressHeapByteBuffers(nioOutput, nioInput, inputLen);
+        n = ctx.decompressByteArray(nioOutput.array(),
+          nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() - 
nioOutput.position(),
+          nioInput.array(), nioInput.arrayOffset() + nioInput.position(), 
inputLen);
+      } else if (nioOutput.isDirect() && !nioInput.isDirect()) {
+        n = ctx.decompressByteArrayToDirectByteBuffer(nioOutput, 
nioOutput.position(),
+          nioOutput.limit() - nioOutput.position(), nioInput.array(),
+          nioInput.arrayOffset() + nioInput.position(), inputLen);
+      } else if (!nioOutput.isDirect() && nioInput.isDirect()) {
+        n = ctx.decompressDirectByteBufferToByteArray(nioOutput.array(),
+          nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() - 
nioOutput.position(),
+          nioInput, nioInput.position(), inputLen);
+      } else {
+        throw new IllegalStateException("Unreachable line");
       }
-    }
-
-    throw new IllegalStateException("One buffer is direct and the other is 
not, "
-      + "or one or more not SingleByteBuffs. This is not supported");
-  }
 
-  private int decompressDirectByteBuffers(ByteBuffer output, ByteBuffer input, 
int inputLen) {
-    int origOutputPos = output.position();
+      nioOutput.position(origOutputPos + n);
+      nioInput.position(input.position() + inputLen);
 
-    int n = ctx.decompressDirectByteBuffer(output, output.position(),
-      output.limit() - output.position(), input, input.position(), inputLen);
-
-    output.position(origOutputPos + n);
-    input.position(input.position() + inputLen);
-    return n;
-  }
-
-  private int decompressHeapByteBuffers(ByteBuffer output, ByteBuffer input, 
int inputLen) {
-    int origOutputPos = output.position();
-
-    int n = ctx.decompressByteArray(output.array(), output.arrayOffset() + 
output.position(),
-      output.limit() - output.position(), input.array(), input.arrayOffset() + 
input.position(),
-      inputLen);
-
-    output.position(origOutputPos + n);
-    input.position(input.position() + inputLen);
-    return n;
+      return n;
+    } else {
+      throw new IllegalStateException(
+        "At least one buffer is not a SingleByteBuff, this is not supported");
+    }
   }
 
   @Override
diff --git 
a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java
 
b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java
index be52d17f133..223bbc021db 100644
--- 
a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java
+++ 
b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java
@@ -62,8 +62,8 @@ public class TestZstdByteBuffDecompressor {
     try (ZstdByteBuffDecompressor decompressor = new 
ZstdByteBuffDecompressor(null)) {
       assertTrue(decompressor.canDecompress(emptySingleHeapBuff, 
emptySingleHeapBuff));
       assertTrue(decompressor.canDecompress(emptySingleDirectBuff, 
emptySingleDirectBuff));
-      assertFalse(decompressor.canDecompress(emptySingleHeapBuff, 
emptySingleDirectBuff));
-      assertFalse(decompressor.canDecompress(emptySingleDirectBuff, 
emptySingleHeapBuff));
+      assertTrue(decompressor.canDecompress(emptySingleHeapBuff, 
emptySingleDirectBuff));
+      assertTrue(decompressor.canDecompress(emptySingleDirectBuff, 
emptySingleHeapBuff));
       assertFalse(decompressor.canDecompress(emptyMultiHeapBuff, 
emptyMultiHeapBuff));
       assertFalse(decompressor.canDecompress(emptyMultiDirectBuff, 
emptyMultiDirectBuff));
       assertFalse(decompressor.canDecompress(emptySingleHeapBuff, 
emptyMultiHeapBuff));
@@ -72,7 +72,7 @@ public class TestZstdByteBuffDecompressor {
   }
 
   @Test
-  public void testDecompressHeap() throws IOException {
+  public void testDecompressHeapToHeap() throws IOException {
     try (ZstdByteBuffDecompressor decompressor = new 
ZstdByteBuffDecompressor(null)) {
       ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64));
       ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD));
@@ -83,7 +83,7 @@ public class TestZstdByteBuffDecompressor {
   }
 
   @Test
-  public void testDecompressDirect() throws IOException {
+  public void testDecompressDirectToDirect() throws IOException {
     try (ZstdByteBuffDecompressor decompressor = new 
ZstdByteBuffDecompressor(null)) {
       ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64));
       ByteBuff input = new 
SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length));
@@ -95,4 +95,28 @@ public class TestZstdByteBuffDecompressor {
     }
   }
 
+  @Test
+  public void testDecompressDirectToHeap() throws IOException {
+    try (ZstdByteBuffDecompressor decompressor = new 
ZstdByteBuffDecompressor(null)) {
+      ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64));
+      ByteBuff input = new 
SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length));
+      input.put(COMPRESSED_PAYLOAD);
+      input.rewind();
+      int decompressedSize = decompressor.decompress(output, input, 
COMPRESSED_PAYLOAD.length);
+      assertEquals("HBase is fun to use and very fast",
+        Bytes.toString(output.toBytes(0, decompressedSize)));
+    }
+  }
+
+  @Test
+  public void testDecompressHeapToDirect() throws IOException {
+    try (ZstdByteBuffDecompressor decompressor = new 
ZstdByteBuffDecompressor(null)) {
+      ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64));
+      ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD));
+      int decompressedSize = decompressor.decompress(output, input, 
COMPRESSED_PAYLOAD.length);
+      assertEquals("HBase is fun to use and very fast",
+        Bytes.toString(output.toBytes(0, decompressedSize)));
+    }
+  }
+
 }
diff --git a/pom.xml b/pom.xml
index 09129d7f26e..d6eda4ff072 100644
--- a/pom.xml
+++ b/pom.xml
@@ -662,7 +662,7 @@
     <lz4.version>1.8.0</lz4.version>
     <snappy.version>1.1.10.4</snappy.version>
     <xz.version>1.9</xz.version>
-    <zstd-jni.version>1.5.5-2</zstd-jni.version>
+    <zstd-jni.version>1.5.7-2</zstd-jni.version>
     <!--
         Note that the version of protobuf shipped in hbase-thirdparty must 
match the version used
         in hbase-protocol-shaded and hbase-examples. The version of 
jackson-[annotations,core,

Reply via email to