This is an automated email from the ASF dual-hosted git repository.

dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7e1ade803 Arrow: Pad decimal bytes before passing to decimal vector 
(#5168)
7e1ade803 is described below

commit 7e1ade80397a54c24db3634cb6015d539143e62d
Author: Bryan Keller <[email protected]>
AuthorDate: Fri Jul 1 12:55:08 2022 -0700

    Arrow: Pad decimal bytes before passing to decimal vector (#5168)
    
    * Arrow: Pad decimal bytes before passing to vector
    
    * comment clarification
    
    * optimize fill for neg numbers
    
    * Add overflow check
---
 .../arrow/vectorized/ArrowVectorAccessors.java     |  4 +-
 .../vectorized/parquet/DecimalVectorUtil.java      | 61 ++++++++++++++++++
 ...orizedDictionaryEncodedParquetValuesReader.java |  9 ++-
 .../VectorizedParquetDefinitionLevelReader.java    | 10 +--
 .../vectorized/parquet/DecimalVectorUtilTest.java  | 73 ++++++++++++++++++++++
 5 files changed, 147 insertions(+), 10 deletions(-)

diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
index 448b18eda..69b5934c4 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
@@ -74,7 +74,9 @@ final class ArrowVectorAccessors {
         return new String(byteBuffer.array(), byteBuffer.arrayOffset() + 
byteBuffer.position(),
             byteBuffer.remaining(), StandardCharsets.UTF_8);
       }
-      return StandardCharsets.UTF_8.decode(byteBuffer).toString();
+      byte[] bytes = new byte[byteBuffer.remaining()];
+      byteBuffer.get(bytes);
+      return new String(bytes, StandardCharsets.UTF_8);
     }
   }
 }
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
new file mode 100644
index 000000000..cb4e45369
--- /dev/null
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized.parquet;
+
+import java.util.Arrays;
+
+public class DecimalVectorUtil {
+
+  private DecimalVectorUtil() {
+  }
+
+  /**
+   * Parquet stores decimal values in big-endian byte order, and Arrow stores 
them in native byte order.
+   * When setting the value in Arrow, we call setBigEndian(), and the byte 
order is reversed if needed.
+   * Also, the byte array is padded to fill 16 bytes in length by calling 
Unsafe.setMemory(). The padding
+   * operation can be slow, so by using this utility method, we can pad before 
calling setBigEndian() and
+   * avoid the call to Unsafe.setMemory().
+   *
+   * @param bigEndianBytes The big endian bytes
+   * @param newLength      The length of the byte array to return
+   * @return The new byte array
+   */
+  public static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength) 
{
+    if (bigEndianBytes.length == newLength) {
+      return bigEndianBytes;
+    } else if (bigEndianBytes.length < newLength) {
+      byte[] result = new byte[newLength];
+      if (bigEndianBytes.length == 0) {
+        return result;
+      }
+
+      int start = newLength - bigEndianBytes.length;
+      if (bigEndianBytes[0] < 0) {
+        Arrays.fill(result, 0, start, (byte) 0xFF);
+      }
+      System.arraycopy(bigEndianBytes, 0, result, start, 
bigEndianBytes.length);
+
+      return result;
+    }
+    throw new IllegalArgumentException(String.format("Buffer size of %d is 
larger than requested size of %d",
+            bigEndianBytes.length, newLength));
+  }
+
+}
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 1773d7487..e8e3cabd9 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -128,12 +128,11 @@ public class 
VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
   class FixedLengthDecimalDictEncodedReader extends BaseDictEncodedReader {
     @Override
     protected void nextVal(FieldVector vector, Dictionary dict, int idx, int 
currentVal, int typeWidth) {
-      byte[] decimalBytes = dict.decodeToBinary(currentVal).getBytesUnsafe();
-      byte[] vectorBytes = new byte[typeWidth];
-      System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
+      byte[] vectorBytes =
+          DecimalVectorUtil.padBigEndianBytes(
+              dict.decodeToBinary(currentVal).getBytesUnsafe(),
+              DecimalVector.TYPE_WIDTH);
       ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
-      ByteBuffer buffer = dict.decodeToBinary(currentVal).toByteBuffer();
-      vector.getDataBuffer().setBytes(idx, buffer);
     }
   }
 
diff --git 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index dbfb4054e..a8990d0cd 100644
--- 
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ 
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -358,7 +358,8 @@ public final class VectorizedParquetDefinitionLevelReader 
extends BaseVectorized
     protected void nextVal(
         FieldVector vector, int idx, ValuesAsBytesReader valuesReader, int 
typeWidth, byte[] byteArray) {
       valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-      ((DecimalVector) vector).setBigEndian(idx, byteArray);
+      byte[] vectorBytes = DecimalVectorUtil.padBigEndianBytes(byteArray, 
DecimalVector.TYPE_WIDTH);
+      ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
     }
 
     @Override
@@ -369,9 +370,10 @@ public final class VectorizedParquetDefinitionLevelReader 
extends BaseVectorized
         reader.fixedLengthDecimalDictEncodedReader()
             .nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder, 
typeWidth);
       } else if (Mode.PACKED.equals(mode)) {
-        ByteBuffer decimalBytes = 
dict.decodeToBinary(reader.readInteger()).toByteBuffer();
-        byte[] vectorBytes = new byte[typeWidth];
-        System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
+        byte[] vectorBytes =
+            DecimalVectorUtil.padBigEndianBytes(
+                dict.decodeToBinary(reader.readInteger()).getBytesUnsafe(),
+                DecimalVector.TYPE_WIDTH);
         ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
       }
     }
diff --git 
a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java
 
b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java
new file mode 100644
index 000000000..10fe1afcd
--- /dev/null
+++ 
b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized.parquet;
+
+import java.math.BigInteger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DecimalVectorUtilTest {
+
+  @Test
+  public void testPadBigEndianBytes() {
+    BigInteger bigInt = new BigInteger("12345");
+    byte[] bytes = bigInt.toByteArray();
+    byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+
+    assertEquals(16, paddedBytes.length);
+    BigInteger result = new BigInteger(paddedBytes);
+    assertEquals(bigInt, result);
+  }
+
+  @Test
+  public void testPadBigEndianBytesNegative() {
+    BigInteger bigInt = new BigInteger("-12345");
+    byte[] bytes = bigInt.toByteArray();
+    byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+
+    assertEquals(16, paddedBytes.length);
+    BigInteger result = new BigInteger(paddedBytes);
+    assertEquals(bigInt, result);
+  }
+
+  @Test
+  public void testPadBigEndianBytesZero() {
+    byte[] bytes = BigInteger.ZERO.toByteArray();
+    byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+
+    assertEquals(16, paddedBytes.length);
+    BigInteger result = new BigInteger(paddedBytes);
+    assertEquals(BigInteger.ZERO, result);
+
+    bytes = new byte[0];
+    paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+
+    assertEquals(16, paddedBytes.length);
+    result = new BigInteger(paddedBytes);
+    assertEquals(BigInteger.ZERO, result);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testPadBigEndianBytesOverflow() {
+    byte[] bytes = new byte[17];
+    DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+  }
+}

Reply via email to