This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9532a46c0 [#2072] improvement: Corrected buffer offsets in 
NoOpCodec.decompress (#2072)
9532a46c0 is described below

commit 9532a46c00dbc0e5e74f49d7f0ee638f501b2a15
Author: Zhen Wang <643348...@qq.com>
AuthorDate: Mon Sep 2 14:20:10 2024 +0800

    [#2072] improvement: Corrected buffer offsets in NoOpCodec.decompress 
(#2072)
    
    ### What changes were proposed in this pull request?
    
    Corrected buffer offsets in NoOpCodec.decompress
    
    ### Why are the changes needed?
    
    Make NoOpCodec.decompress logic consistent with other codecs.
    
    Fix: # (issue)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    added unit tests
---
 .../uniffle/common/compression/NoOpCodec.java      |  6 +--
 .../common/compression/CompressionTest.java        | 48 ++++++++++++++++++++++
 2 files changed, 51 insertions(+), 3 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java 
b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
index 4cd111a34..89cd05771 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/NoOpCodec.java
@@ -31,9 +31,9 @@ public class NoOpCodec extends Codec {
 
   @Override
   public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest, 
int destOffset) {
-    dest.put(src);
-    dest.position(destOffset);
-    dest.limit(destOffset + uncompressedLen);
+    ByteBuffer destDuplicated = dest.duplicate();
+    destDuplicated.position(destOffset);
+    destDuplicated.put(src.duplicate());
   }
 
   @Override
diff --git 
a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
 
b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
index ac5af5aa7..31238b11f 100644
--- 
a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
+++ 
b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
@@ -19,14 +19,17 @@ package org.apache.uniffle.common.compression;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import org.apache.commons.lang3.RandomUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import org.apache.uniffle.common.config.RssConf;
+import org.apache.uniffle.common.util.ByteBufferUtils;
 
 import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
@@ -147,4 +150,49 @@ public class CompressionTest {
     dest.get(res);
     assertArrayEquals(originData, res);
   }
+
+  @Test
+  public void checkDecompressBufferOffsets() {
+    byte[] data = RandomUtils.nextBytes(1024);
+    // Snappy decompression does not support non-zero offset for destination 
direct ByteBuffer
+    Codec.Type[] types = {Codec.Type.ZSTD, Codec.Type.LZ4, Codec.Type.NOOP};
+    Boolean[] isDirects = {true, false};
+    for (Boolean isDirect : isDirects) {
+      for (Codec.Type type : types) {
+        Codec codec = Codec.newInstance(new RssConf().set(COMPRESSION_TYPE, 
type)).get();
+        byte[] compressed = codec.compress(data);
+
+        ByteBuffer src;
+        if (isDirect) {
+          src = ByteBuffer.allocateDirect(compressed.length);
+        } else {
+          src = ByteBuffer.allocate(compressed.length);
+        }
+        src.put(compressed);
+        src.flip();
+
+        ByteBuffer dest;
+        if (isDirect) {
+          dest = ByteBuffer.allocateDirect(2048);
+        } else {
+          dest = ByteBuffer.allocate(2048);
+        }
+        codec.decompress(src, 1024, dest, 0);
+        assertEquals(0, src.position());
+        assertEquals(compressed.length, src.limit());
+        assertEquals(0, dest.position());
+        assertEquals(2048, dest.limit());
+        assertArrayEquals(
+            data, 
Arrays.copyOfRange(ByteBufferUtils.bufferToArray(dest.duplicate()), 0, 1024));
+
+        codec.decompress(src, 1024, dest, 1024);
+        assertEquals(0, src.position());
+        assertEquals(compressed.length, src.limit());
+        assertEquals(0, dest.position());
+        assertEquals(2048, dest.limit());
+        assertArrayEquals(
+            data, 
Arrays.copyOfRange(ByteBufferUtils.bufferToArray(dest.duplicate()), 1024, 
2048));
+      }
+    }
+  }
 }

Reply via email to