This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new e110ee1c30e HBASE-29193: Allow ZstdByteBuffDecompressor to take direct
ByteBuffer as input and heap ByteBuffer as output, or vice versa (#6806)
e110ee1c30e is described below
commit e110ee1c30e2a6be2ebf39cc109c7f5f10981994
Author: Charles Connell <[email protected]>
AuthorDate: Thu Apr 17 04:07:35 2025 -0400
HBASE-29193: Allow ZstdByteBuffDecompressor to take direct ByteBuffer as
input and heap ByteBuffer as output, or vice versa (#6806)
Signed-off-by: Nick Dimiduk <[email protected]>
---
.../io/compress/zstd/ZstdByteBuffDecompressor.java | 67 +++++++++-------------
.../zstd/TestZstdByteBuffDecompressor.java | 32 +++++++++--
pom.xml | 2 +-
3 files changed, 55 insertions(+), 46 deletions(-)
diff --git
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
index ec5315aa4c0..d71d46e2946 100644
---
a/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
+++
b/hbase-compression/hbase-compression-zstd/src/main/java/org/apache/hadoop/hbase/io/compress/zstd/ZstdByteBuffDecompressor.java
@@ -55,20 +55,8 @@ public class ZstdByteBuffDecompressor implements
ByteBuffDecompressor, CanReinit
@Override
public boolean canDecompress(ByteBuff output, ByteBuff input) {
- if (!allowByteBuffDecompression) {
- return false;
- }
- if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) {
- ByteBuffer nioOutput = output.nioByteBuffers()[0];
- ByteBuffer nioInput = input.nioByteBuffers()[0];
- if (nioOutput.isDirect() && nioInput.isDirect()) {
- return true;
- } else if (!nioOutput.isDirect() && !nioInput.isDirect()) {
- return true;
- }
- }
-
- return false;
+ return allowByteBuffDecompression && output instanceof SingleByteBuff
+ && input instanceof SingleByteBuff;
}
@Override
@@ -80,38 +68,35 @@ public class ZstdByteBuffDecompressor implements
ByteBuffDecompressor, CanReinit
if (output instanceof SingleByteBuff && input instanceof SingleByteBuff) {
ByteBuffer nioOutput = output.nioByteBuffers()[0];
ByteBuffer nioInput = input.nioByteBuffers()[0];
+ int origOutputPos = nioOutput.position();
+ int n;
if (nioOutput.isDirect() && nioInput.isDirect()) {
- return decompressDirectByteBuffers(nioOutput, nioInput, inputLen);
+ n = ctx.decompressDirectByteBuffer(nioOutput, nioOutput.position(),
+ nioOutput.limit() - nioOutput.position(), nioInput,
nioInput.position(), inputLen);
} else if (!nioOutput.isDirect() && !nioInput.isDirect()) {
- return decompressHeapByteBuffers(nioOutput, nioInput, inputLen);
+ n = ctx.decompressByteArray(nioOutput.array(),
+ nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() -
nioOutput.position(),
+ nioInput.array(), nioInput.arrayOffset() + nioInput.position(),
inputLen);
+ } else if (nioOutput.isDirect() && !nioInput.isDirect()) {
+ n = ctx.decompressByteArrayToDirectByteBuffer(nioOutput,
nioOutput.position(),
+ nioOutput.limit() - nioOutput.position(), nioInput.array(),
+ nioInput.arrayOffset() + nioInput.position(), inputLen);
+ } else if (!nioOutput.isDirect() && nioInput.isDirect()) {
+ n = ctx.decompressDirectByteBufferToByteArray(nioOutput.array(),
+ nioOutput.arrayOffset() + nioOutput.position(), nioOutput.limit() -
nioOutput.position(),
+ nioInput, nioInput.position(), inputLen);
+ } else {
+ throw new IllegalStateException("Unreachable line");
}
- }
-
- throw new IllegalStateException("One buffer is direct and the other is
not, "
- + "or one or more not SingleByteBuffs. This is not supported");
- }
- private int decompressDirectByteBuffers(ByteBuffer output, ByteBuffer input,
int inputLen) {
- int origOutputPos = output.position();
+ nioOutput.position(origOutputPos + n);
+ nioInput.position(input.position() + inputLen);
- int n = ctx.decompressDirectByteBuffer(output, output.position(),
- output.limit() - output.position(), input, input.position(), inputLen);
-
- output.position(origOutputPos + n);
- input.position(input.position() + inputLen);
- return n;
- }
-
- private int decompressHeapByteBuffers(ByteBuffer output, ByteBuffer input,
int inputLen) {
- int origOutputPos = output.position();
-
- int n = ctx.decompressByteArray(output.array(), output.arrayOffset() +
output.position(),
- output.limit() - output.position(), input.array(), input.arrayOffset() +
input.position(),
- inputLen);
-
- output.position(origOutputPos + n);
- input.position(input.position() + inputLen);
- return n;
+ return n;
+ } else {
+ throw new IllegalStateException(
+ "At least one buffer is not a SingleByteBuff, this is not supported");
+ }
}
@Override
diff --git
a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java
b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java
index be52d17f133..223bbc021db 100644
---
a/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java
+++
b/hbase-compression/hbase-compression-zstd/src/test/java/org/apache/hadoop/hbase/io/compress/zstd/TestZstdByteBuffDecompressor.java
@@ -62,8 +62,8 @@ public class TestZstdByteBuffDecompressor {
try (ZstdByteBuffDecompressor decompressor = new
ZstdByteBuffDecompressor(null)) {
assertTrue(decompressor.canDecompress(emptySingleHeapBuff,
emptySingleHeapBuff));
assertTrue(decompressor.canDecompress(emptySingleDirectBuff,
emptySingleDirectBuff));
- assertFalse(decompressor.canDecompress(emptySingleHeapBuff,
emptySingleDirectBuff));
- assertFalse(decompressor.canDecompress(emptySingleDirectBuff,
emptySingleHeapBuff));
+ assertTrue(decompressor.canDecompress(emptySingleHeapBuff,
emptySingleDirectBuff));
+ assertTrue(decompressor.canDecompress(emptySingleDirectBuff,
emptySingleHeapBuff));
assertFalse(decompressor.canDecompress(emptyMultiHeapBuff,
emptyMultiHeapBuff));
assertFalse(decompressor.canDecompress(emptyMultiDirectBuff,
emptyMultiDirectBuff));
assertFalse(decompressor.canDecompress(emptySingleHeapBuff,
emptyMultiHeapBuff));
@@ -72,7 +72,7 @@ public class TestZstdByteBuffDecompressor {
}
@Test
- public void testDecompressHeap() throws IOException {
+ public void testDecompressHeapToHeap() throws IOException {
try (ZstdByteBuffDecompressor decompressor = new
ZstdByteBuffDecompressor(null)) {
ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64));
ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD));
@@ -83,7 +83,7 @@ public class TestZstdByteBuffDecompressor {
}
@Test
- public void testDecompressDirect() throws IOException {
+ public void testDecompressDirectToDirect() throws IOException {
try (ZstdByteBuffDecompressor decompressor = new
ZstdByteBuffDecompressor(null)) {
ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64));
ByteBuff input = new
SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length));
@@ -95,4 +95,28 @@ public class TestZstdByteBuffDecompressor {
}
}
+ @Test
+ public void testDecompressDirectToHeap() throws IOException {
+ try (ZstdByteBuffDecompressor decompressor = new
ZstdByteBuffDecompressor(null)) {
+ ByteBuff output = new SingleByteBuff(ByteBuffer.allocate(64));
+ ByteBuff input = new
SingleByteBuff(ByteBuffer.allocateDirect(COMPRESSED_PAYLOAD.length));
+ input.put(COMPRESSED_PAYLOAD);
+ input.rewind();
+ int decompressedSize = decompressor.decompress(output, input,
COMPRESSED_PAYLOAD.length);
+ assertEquals("HBase is fun to use and very fast",
+ Bytes.toString(output.toBytes(0, decompressedSize)));
+ }
+ }
+
+ @Test
+ public void testDecompressHeapToDirect() throws IOException {
+ try (ZstdByteBuffDecompressor decompressor = new
ZstdByteBuffDecompressor(null)) {
+ ByteBuff output = new SingleByteBuff(ByteBuffer.allocateDirect(64));
+ ByteBuff input = new SingleByteBuff(ByteBuffer.wrap(COMPRESSED_PAYLOAD));
+ int decompressedSize = decompressor.decompress(output, input,
COMPRESSED_PAYLOAD.length);
+ assertEquals("HBase is fun to use and very fast",
+ Bytes.toString(output.toBytes(0, decompressedSize)));
+ }
+ }
+
}
diff --git a/pom.xml b/pom.xml
index 080afbc0f36..82affd421d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -663,7 +663,7 @@
<brotli4j.version>1.11.0</brotli4j.version>
<lz4.version>1.8.0</lz4.version>
<snappy.version>1.1.10.4</snappy.version>
- <zstd-jni.version>1.5.5-2</zstd-jni.version>
+ <zstd-jni.version>1.5.7-2</zstd-jni.version>
<!--
Note that the version of protobuf shipped in hbase-thirdparty must
match the version used
in hbase-protocol-shaded and hbase-examples. The version of
jackson-[annotations,core,