This is an automated email from the ASF dual-hosted git repository.
dweeks pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 7e1ade803 Arrow: Pad decimal bytes before passing to decimal vector
(#5168)
7e1ade803 is described below
commit 7e1ade80397a54c24db3634cb6015d539143e62d
Author: Bryan Keller <[email protected]>
AuthorDate: Fri Jul 1 12:55:08 2022 -0700
Arrow: Pad decimal bytes before passing to decimal vector (#5168)
* Arrow: Pad decimal bytes before passing to vector
* comment clarification
* optimize fill for neg numbers
* Add overflow check
---
.../arrow/vectorized/ArrowVectorAccessors.java | 4 +-
.../vectorized/parquet/DecimalVectorUtil.java | 61 ++++++++++++++++++
...orizedDictionaryEncodedParquetValuesReader.java | 9 ++-
.../VectorizedParquetDefinitionLevelReader.java | 10 +--
.../vectorized/parquet/DecimalVectorUtilTest.java | 73 ++++++++++++++++++++++
5 files changed, 147 insertions(+), 10 deletions(-)
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
index 448b18eda..69b5934c4 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/ArrowVectorAccessors.java
@@ -74,7 +74,9 @@ final class ArrowVectorAccessors {
return new String(byteBuffer.array(), byteBuffer.arrayOffset() +
byteBuffer.position(),
byteBuffer.remaining(), StandardCharsets.UTF_8);
}
- return StandardCharsets.UTF_8.decode(byteBuffer).toString();
+ byte[] bytes = new byte[byteBuffer.remaining()];
+ byteBuffer.get(bytes);
+ return new String(bytes, StandardCharsets.UTF_8);
}
}
}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
new file mode 100644
index 000000000..cb4e45369
--- /dev/null
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtil.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized.parquet;
+
+import java.util.Arrays;
+
+public class DecimalVectorUtil {
+
+ private DecimalVectorUtil() {
+ }
+
+ /**
+ * Parquet stores decimal values in big-endian byte order, and Arrow stores
them in native byte order.
+ * When setting the value in Arrow, we call setBigEndian(), and the byte
order is reversed if needed.
+ * Also, the byte array is padded to fill 16 bytes in length by calling
Unsafe.setMemory(). The padding
+ * operation can be slow, so by using this utility method, we can pad before
calling setBigEndian() and
+ * avoid the call to Unsafe.setMemory().
+ *
+ * @param bigEndianBytes The big endian bytes
+ * @param newLength The length of the byte array to return
+ * @return The new byte array
+ */
+ public static byte[] padBigEndianBytes(byte[] bigEndianBytes, int newLength)
{
+ if (bigEndianBytes.length == newLength) {
+ return bigEndianBytes;
+ } else if (bigEndianBytes.length < newLength) {
+ byte[] result = new byte[newLength];
+ if (bigEndianBytes.length == 0) {
+ return result;
+ }
+
+ int start = newLength - bigEndianBytes.length;
+ if (bigEndianBytes[0] < 0) {
+ Arrays.fill(result, 0, start, (byte) 0xFF);
+ }
+ System.arraycopy(bigEndianBytes, 0, result, start,
bigEndianBytes.length);
+
+ return result;
+ }
+ throw new IllegalArgumentException(String.format("Buffer size of %d is
larger than requested size of %d",
+ bigEndianBytes.length, newLength));
+ }
+
+}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 1773d7487..e8e3cabd9 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -128,12 +128,11 @@ public class
VectorizedDictionaryEncodedParquetValuesReader extends BaseVectoriz
class FixedLengthDecimalDictEncodedReader extends BaseDictEncodedReader {
@Override
protected void nextVal(FieldVector vector, Dictionary dict, int idx, int
currentVal, int typeWidth) {
- byte[] decimalBytes = dict.decodeToBinary(currentVal).getBytesUnsafe();
- byte[] vectorBytes = new byte[typeWidth];
- System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
+ byte[] vectorBytes =
+ DecimalVectorUtil.padBigEndianBytes(
+ dict.decodeToBinary(currentVal).getBytesUnsafe(),
+ DecimalVector.TYPE_WIDTH);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
- ByteBuffer buffer = dict.decodeToBinary(currentVal).toByteBuffer();
- vector.getDataBuffer().setBytes(idx, buffer);
}
}
diff --git
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index dbfb4054e..a8990d0cd 100644
---
a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++
b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -358,7 +358,8 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
protected void nextVal(
FieldVector vector, int idx, ValuesAsBytesReader valuesReader, int
typeWidth, byte[] byteArray) {
valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
- ((DecimalVector) vector).setBigEndian(idx, byteArray);
+ byte[] vectorBytes = DecimalVectorUtil.padBigEndianBytes(byteArray,
DecimalVector.TYPE_WIDTH);
+ ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
}
@Override
@@ -369,9 +370,10 @@ public final class VectorizedParquetDefinitionLevelReader
extends BaseVectorized
reader.fixedLengthDecimalDictEncodedReader()
.nextBatch(vector, idx, numValuesToRead, dict, nullabilityHolder,
typeWidth);
} else if (Mode.PACKED.equals(mode)) {
- ByteBuffer decimalBytes =
dict.decodeToBinary(reader.readInteger()).toByteBuffer();
- byte[] vectorBytes = new byte[typeWidth];
- System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
+ byte[] vectorBytes =
+ DecimalVectorUtil.padBigEndianBytes(
+ dict.decodeToBinary(reader.readInteger()).getBytesUnsafe(),
+ DecimalVector.TYPE_WIDTH);
((DecimalVector) vector).setBigEndian(idx, vectorBytes);
}
}
diff --git
a/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java
b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java
new file mode 100644
index 000000000..10fe1afcd
--- /dev/null
+++
b/arrow/src/test/java/org/apache/iceberg/arrow/vectorized/parquet/DecimalVectorUtilTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized.parquet;
+
+import java.math.BigInteger;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DecimalVectorUtilTest {
+
+ @Test
+ public void testPadBigEndianBytes() {
+ BigInteger bigInt = new BigInteger("12345");
+ byte[] bytes = bigInt.toByteArray();
+ byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+
+ assertEquals(16, paddedBytes.length);
+ BigInteger result = new BigInteger(paddedBytes);
+ assertEquals(bigInt, result);
+ }
+
+ @Test
+ public void testPadBigEndianBytesNegative() {
+ BigInteger bigInt = new BigInteger("-12345");
+ byte[] bytes = bigInt.toByteArray();
+ byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+
+ assertEquals(16, paddedBytes.length);
+ BigInteger result = new BigInteger(paddedBytes);
+ assertEquals(bigInt, result);
+ }
+
+ @Test
+ public void testPadBigEndianBytesZero() {
+ byte[] bytes = BigInteger.ZERO.toByteArray();
+ byte[] paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+
+ assertEquals(16, paddedBytes.length);
+ BigInteger result = new BigInteger(paddedBytes);
+ assertEquals(BigInteger.ZERO, result);
+
+ bytes = new byte[0];
+ paddedBytes = DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+
+ assertEquals(16, paddedBytes.length);
+ result = new BigInteger(paddedBytes);
+ assertEquals(BigInteger.ZERO, result);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testPadBigEndianBytesOverflow() {
+ byte[] bytes = new byte[17];
+ DecimalVectorUtil.padBigEndianBytes(bytes, 16);
+ }
+}