This is an automated email from the ASF dual-hosted git repository. zuston pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push: new 9532a46c0 [#2072] improvement: Corrected buffer offsets in NoOpCodec.decompress (#2072) 9532a46c0 is described below commit 9532a46c00dbc0e5e74f49d7f0ee638f501b2a15 Author: Zhen Wang <643348...@qq.com> AuthorDate: Mon Sep 2 14:20:10 2024 +0800 [#2072] improvement: Corrected buffer offsets in NoOpCodec.decompress (#2072) ### What changes were proposed in this pull request? Corrected buffer offsets in NoOpCodec.decompress ### Why are the changes needed? Make NoOpCodec.decompress logic consistent with other codecs. Fix: # (issue) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? added unit tests --- .../uniffle/common/compression/NoOpCodec.java | 6 +-- .../common/compression/CompressionTest.java | 48 ++++++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java index 4cd111a34..89cd05771 100644 --- a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java +++ b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java @@ -31,9 +31,9 @@ public class NoOpCodec extends Codec { @Override public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest, int destOffset) { - dest.put(src); - dest.position(destOffset); - dest.limit(destOffset + uncompressedLen); + ByteBuffer destDuplicated = dest.duplicate(); + destDuplicated.position(destOffset); + destDuplicated.put(src.duplicate()); } @Override diff --git a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java index ac5af5aa7..31238b11f 100644 --- a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java +++ b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java @@ -19,14 +19,17 @@ package org.apache.uniffle.common.compression; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; import org.apache.uniffle.common.config.RssConf; +import org.apache.uniffle.common.util.ByteBufferUtils; import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE; import static org.junit.jupiter.api.Assertions.assertArrayEquals; @@ -147,4 +150,49 @@ public class CompressionTest { dest.get(res); assertArrayEquals(originData, res); } + + @Test + public void checkDecompressBufferOffsets() { + byte[] data = RandomUtils.nextBytes(1024); + // Snappy decompression does not support non-zero offset for destination direct ByteBuffer + Codec.Type[] types = {Codec.Type.ZSTD, Codec.Type.LZ4, Codec.Type.NOOP}; + Boolean[] isDirects = {true, false}; + for (Boolean isDirect : isDirects) { + for (Codec.Type type : types) { + Codec codec = Codec.newInstance(new RssConf().set(COMPRESSION_TYPE, type)).get(); + byte[] compressed = codec.compress(data); + + ByteBuffer src; + if (isDirect) { + src = ByteBuffer.allocateDirect(compressed.length); + } else { + src = ByteBuffer.allocate(compressed.length); + } + src.put(compressed); + src.flip(); + + ByteBuffer dest; + if (isDirect) { + dest = ByteBuffer.allocateDirect(2048); + } else { + dest = ByteBuffer.allocate(2048); + } + codec.decompress(src, 1024, dest, 0); + assertEquals(0, src.position()); + assertEquals(compressed.length, src.limit()); + assertEquals(0, dest.position()); + assertEquals(2048, dest.limit()); + assertArrayEquals( + data, Arrays.copyOfRange(ByteBufferUtils.bufferToArray(dest.duplicate()), 0, 1024)); + + codec.decompress(src, 1024, dest, 1024); + assertEquals(0, src.position()); + assertEquals(compressed.length, src.limit()); + assertEquals(0, dest.position()); + assertEquals(2048, dest.limit()); + assertArrayEquals( + data, Arrays.copyOfRange(ByteBufferUtils.bufferToArray(dest.duplicate()), 1024, 2048)); + } + } + } }