This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 908a4e6be2 [parquet] Fix parquet decimal type match. (#5001)
908a4e6be2 is described below
commit 908a4e6be2b7885de00e76c2cc6dff00220e662c
Author: YeJunHao <[email protected]>
AuthorDate: Mon Feb 10 17:50:00 2025 +0800
[parquet] Fix parquet decimal type match. (#5001)
---
.../newreader/ParquetVectorUpdaterFactory.java | 115 +++++++++----------
.../parquet/newreader/VectorizedColumnReader.java | 11 +-
.../format/parquet/ParquetReadWriteTest.java | 127 +++++++++++++++++++++
3 files changed, 186 insertions(+), 67 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
index f59f691f8f..465d8824d4 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
@@ -20,6 +20,7 @@ package org.apache.paimon.format.parquet.newreader;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.data.columnar.heap.HeapLongVector;
import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
import org.apache.paimon.data.columnar.writable.WritableByteVector;
import org.apache.paimon.data.columnar.writable.WritableBytesVector;
@@ -64,7 +65,6 @@ import org.apache.parquet.schema.PrimitiveType;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;
@@ -79,13 +79,8 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Updater Factory to get {@link ParquetVectorUpdater}. */
public class ParquetVectorUpdaterFactory {
- private final LogicalTypeAnnotation logicalTypeAnnotation;
-
- ParquetVectorUpdaterFactory(LogicalTypeAnnotation logicalTypeAnnotation) {
- this.logicalTypeAnnotation = logicalTypeAnnotation;
- }
-
- public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor,
DataType paimonType) {
+ public static ParquetVectorUpdater getUpdater(
+ ColumnDescriptor descriptor, DataType paimonType) {
return
paimonType.accept(UpdaterFactoryVisitor.INSTANCE).apply(descriptor);
}
@@ -144,14 +139,7 @@ public class ParquetVectorUpdaterFactory {
case BINARY:
return new BinaryToDecimalUpdater(c, decimalType);
case FIXED_LEN_BYTE_ARRAY:
- int precision = decimalType.getPrecision();
- if (ParquetSchemaConverter.is32BitDecimal(precision)) {
- return new IntegerToDecimalUpdater(c, decimalType);
- } else if
(ParquetSchemaConverter.is64BitDecimal(precision)) {
- return new LongToDecimalUpdater(c, decimalType);
- } else {
- return new FixedLenByteArrayToDecimalUpdater(c,
decimalType);
- }
+ return new FixedLenByteArrayToDecimalUpdater(c,
decimalType);
}
throw new RuntimeException(
"Unsupported decimal type: " +
c.getPrimitiveType().getPrimitiveTypeName());
@@ -614,10 +602,10 @@ public class ParquetVectorUpdaterFactory {
private abstract static class DecimalUpdater<T extends
WritableColumnVector>
implements ParquetVectorUpdater<T> {
- private final DecimalType sparkType;
+ protected final DecimalType paimonType;
- DecimalUpdater(DecimalType sparkType) {
- this.sparkType = sparkType;
+ DecimalUpdater(DecimalType paimonType) {
+ this.paimonType = paimonType;
}
@Override
@@ -627,22 +615,6 @@ public class ParquetVectorUpdaterFactory {
readValue(offset + i, values, valuesReader);
}
}
-
- protected void writeDecimal(int offset, WritableColumnVector values,
BigDecimal decimal) {
- BigDecimal scaledDecimal =
- decimal.setScale(sparkType.getScale(),
RoundingMode.UNNECESSARY);
- int precision = decimal.precision();
- if (ParquetSchemaConverter.is32BitDecimal(precision)) {
- ((WritableIntVector) values)
- .setInt(offset,
scaledDecimal.unscaledValue().intValue());
- } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
- ((WritableLongVector) values)
- .setLong(offset,
scaledDecimal.unscaledValue().longValue());
- } else {
- byte[] bytes = scaledDecimal.unscaledValue().toByteArray();
- ((WritableBytesVector) values).putByteArray(offset, bytes, 0,
bytes.length);
- }
- }
}
private static class IntegerToDecimalUpdater extends
DecimalUpdater<WritableIntVector> {
@@ -687,8 +659,8 @@ public class ParquetVectorUpdaterFactory {
private static class LongToDecimalUpdater extends
DecimalUpdater<WritableLongVector> {
private final int parquetScale;
- LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType
sparkType) {
- super(sparkType);
+ LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType
paimonType) {
+ super(paimonType);
LogicalTypeAnnotation typeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
if (typeAnnotation instanceof DecimalLogicalTypeAnnotation) {
@@ -726,8 +698,8 @@ public class ParquetVectorUpdaterFactory {
private static class BinaryToDecimalUpdater extends
DecimalUpdater<WritableBytesVector> {
private final int parquetScale;
- BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType
sparkType) {
- super(sparkType);
+ BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType
paimonType) {
+ super(paimonType);
LogicalTypeAnnotation typeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
this.parquetScale = ((DecimalLogicalTypeAnnotation)
typeAnnotation).getScale();
@@ -766,15 +738,17 @@ public class ParquetVectorUpdaterFactory {
}
private static class FixedLenByteArrayToDecimalUpdater
- extends DecimalUpdater<WritableBytesVector> {
- private final int parquetScale;
+ extends DecimalUpdater<WritableColumnVector> {
private final int arrayLen;
- FixedLenByteArrayToDecimalUpdater(ColumnDescriptor descriptor,
DecimalType sparkType) {
- super(sparkType);
+ FixedLenByteArrayToDecimalUpdater(ColumnDescriptor descriptor,
DecimalType paimonType) {
+ super(paimonType);
LogicalTypeAnnotation typeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
- this.parquetScale = ((DecimalLogicalTypeAnnotation)
typeAnnotation).getScale();
+ int parquetScale = ((DecimalLogicalTypeAnnotation)
typeAnnotation).getScale();
+ checkArgument(
+ parquetScale == paimonType.getScale(),
+ "Scale should be match between paimon decimal type and
parquet decimal type in file");
this.arrayLen = descriptor.getPrimitiveType().getTypeLength();
}
@@ -785,27 +759,52 @@ public class ParquetVectorUpdaterFactory {
@Override
public void readValue(
- int offset, WritableBytesVector values, VectorizedValuesReader
valuesReader) {
- BigInteger value = new
BigInteger(valuesReader.readBinary(arrayLen).getBytesUnsafe());
- BigDecimal decimal = new BigDecimal(value, this.parquetScale);
- byte[] bytes = decimal.unscaledValue().toByteArray();
- values.putByteArray(offset, bytes, 0, bytes.length);
+ int offset, WritableColumnVector values,
VectorizedValuesReader valuesReader) {
+ Binary binary = valuesReader.readBinary(arrayLen);
+
+ int precision = paimonType.getPrecision();
+ if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+ ((HeapIntVector) values).setInt(offset, (int)
heapBinaryToLong(binary));
+ } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+ ((HeapLongVector) values).setLong(offset,
heapBinaryToLong(binary));
+ } else {
+ byte[] bytes = binary.getBytesUnsafe();
+ ((WritableBytesVector) values).putByteArray(offset, bytes, 0,
bytes.length);
+ }
+ }
+
+ private long heapBinaryToLong(Binary binary) {
+ ByteBuffer buffer = binary.toByteBuffer();
+ byte[] bytes = buffer.array();
+ int start = buffer.arrayOffset() + buffer.position();
+ int end = buffer.arrayOffset() + buffer.limit();
+
+ long unscaled = 0L;
+
+ for (int i = start; i < end; i++) {
+ unscaled = (unscaled << 8) | (bytes[i] & 0xff);
+ }
+
+ int bits = 8 * (end - start);
+ return (unscaled << (64 - bits)) >> (64 - bits);
}
@Override
public void decodeSingleDictionaryId(
int offset,
- WritableBytesVector values,
+ WritableColumnVector values,
WritableIntVector dictionaryIds,
Dictionary dictionary) {
- BigInteger value =
- new BigInteger(
- dictionary
-
.decodeToBinary(dictionaryIds.getInt(offset))
- .getBytesUnsafe());
- BigDecimal decimal = new BigDecimal(value, this.parquetScale);
- byte[] bytes = decimal.unscaledValue().toByteArray();
- values.putByteArray(offset, bytes, 0, bytes.length);
+ Binary binary =
dictionary.decodeToBinary(dictionaryIds.getInt(offset));
+ int precision = paimonType.getPrecision();
+ if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+ ((HeapIntVector) values).setInt(offset, (int)
heapBinaryToLong(binary));
+ } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+ ((HeapLongVector) values).setLong(offset,
heapBinaryToLong(binary));
+ } else {
+ byte[] bytes = binary.getBytesUnsafe();
+ ((WritableBytesVector) values).putByteArray(offset, bytes, 0,
bytes.length);
+ }
}
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
index 277cd533c5..166a5ce935 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
@@ -39,7 +39,6 @@ import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.values.RequiresPreviousReader;
import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
@@ -67,9 +66,6 @@ public class VectorizedColumnReader {
/** Vectorized RLE decoder for repetition levels. */
private VectorizedRleValuesReader repColumn;
- /** Factory to get type-specific vector updater. */
- private final ParquetVectorUpdaterFactory updaterFactory;
-
/**
* Helper struct to track intermediate states while reading Parquet pages
in the column chunk.
*/
@@ -83,7 +79,6 @@ public class VectorizedColumnReader {
private final PageReader pageReader;
private final ColumnDescriptor descriptor;
- private final LogicalTypeAnnotation logicalTypeAnnotation;
private final ParsedVersion writerVersion;
public VectorizedColumnReader(
@@ -97,8 +92,6 @@ public class VectorizedColumnReader {
this.readState =
new ParquetReadState(
descriptor, isRequired,
pageReadStore.getRowIndexes().orElse(null));
- this.logicalTypeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
- this.updaterFactory = new
ParquetVectorUpdaterFactory(logicalTypeAnnotation);
DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
if (dictionaryPage != null) {
@@ -120,7 +113,7 @@ public class VectorizedColumnReader {
}
private boolean isLazyDecodingSupported(
- PrimitiveType.PrimitiveTypeName typeName, DataType sparkType) {
+ PrimitiveType.PrimitiveTypeName typeName, DataType paimonType) {
return true;
}
@@ -133,7 +126,7 @@ public class VectorizedColumnReader {
WritableIntVector definitionLevels)
throws IOException {
WritableIntVector dictionaryIds = null;
- ParquetVectorUpdater updater = updaterFactory.getUpdater(descriptor,
type);
+ ParquetVectorUpdater updater =
ParquetVectorUpdaterFactory.getUpdater(descriptor, type);
if (dictionary != null) {
// SPARK-16334: We only maintain a single dictionary per row
batch, so that it can be
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index c1028082a2..58fc10cc51 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -83,6 +83,7 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.io.File;
import java.io.IOException;
import java.math.BigDecimal;
+import java.math.RoundingMode;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
@@ -99,6 +100,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
+import static
org.apache.paimon.format.parquet.ParquetSchemaConverter.computeMinBytesForDecimalPrecision;
+import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import static org.assertj.core.api.Assertions.assertThat;
@@ -156,6 +159,16 @@ public class ParquetReadWriteTest {
new TimestampType(6), new
VarCharType(VarCharType.MAX_LENGTH)))
.build();
+ private static final RowType DECIMAL_TYPE =
+ RowType.builder()
+ .fields(
+ new DecimalType(3, 2),
+ new DecimalType(6, 2),
+ new DecimalType(9, 2),
+ new DecimalType(12, 2),
+ new DecimalType(32, 2))
+ .build();
+
private static final RowType NESTED_ARRAY_MAP_TYPE =
RowType.of(
new IntType(),
@@ -478,6 +491,36 @@ public class ParquetReadWriteTest {
compareNestedRow(rows, results);
}
+ @Test
+ public void testDecimalWithFixedLengthRead() throws Exception {
+ int number = new Random().nextInt(1000) + 100;
+ Path path = createDecimalFile(number, folder, 10);
+
+ ParquetReaderFactory format =
+ new ParquetReaderFactory(new Options(), DECIMAL_TYPE, 500,
FilterCompat.NOOP);
+ RecordReader<InternalRow> reader =
+ format.createReader(
+ new FormatReaderContext(
+ new LocalFileIO(), path, new
LocalFileIO().getFileSize(path)));
+ List<InternalRow> results = new ArrayList<>(number);
+ InternalRowSerializer internalRowSerializer = new
InternalRowSerializer(DECIMAL_TYPE);
+ reader.forEachRemaining(row ->
results.add(internalRowSerializer.copy(row)));
+
+ BigDecimal decimalValue0 = new BigDecimal("123.67");
+ BigDecimal decimalValue1 = new BigDecimal("12345.67");
+ BigDecimal decimalValue2 = new BigDecimal("1234567.67");
+ BigDecimal decimalValue3 = new BigDecimal("123456789123.67");
+ BigDecimal decimalValue4 = new
BigDecimal("123456789123456789123456789123.67");
+
+ for (InternalRow internalRow : results) {
+ assertThat(internalRow.getDecimal(0, 3,
2).toBigDecimal()).isEqualTo(decimalValue0);
+ assertThat(internalRow.getDecimal(1, 6,
2).toBigDecimal()).isEqualTo(decimalValue1);
+ assertThat(internalRow.getDecimal(2, 9,
2).toBigDecimal()).isEqualTo(decimalValue2);
+ assertThat(internalRow.getDecimal(3, 12,
2).toBigDecimal()).isEqualTo(decimalValue3);
+ assertThat(internalRow.getDecimal(4, 32,
2).toBigDecimal()).isEqualTo(decimalValue4);
+ }
+ }
+
@Test
public void testNestedNullMapKey() {
List<InternalRow> rows = prepareNestedData(1283, true);
@@ -968,6 +1011,90 @@ public class ParquetReadWriteTest {
return path;
}
+ private Path createDecimalFile(int rowNum, File tmpDir, int rowGroupSize) {
+ Path path = new Path(tmpDir.getPath(), UUID.randomUUID().toString());
+ Configuration conf = new Configuration();
+ conf.setInt("parquet.block.size", rowGroupSize);
+ List<Type> types = new ArrayList<>();
+
+ for (DataField dataField : DECIMAL_TYPE.getFields()) {
+ String name = dataField.name();
+ int fieldId = dataField.id();
+ int precision = ((DecimalType) dataField.type()).getPrecision();
+ int scale = ((DecimalType) dataField.type()).getScale();
+ Type.Repetition repetition =
+ dataField.type().isNullable()
+ ? Type.Repetition.OPTIONAL
+ : Type.Repetition.REQUIRED;
+
+ types.add(
+ Types.primitive(FIXED_LEN_BYTE_ARRAY, repetition)
+ .as(LogicalTypeAnnotation.decimalType(scale,
precision))
+
.length(computeMinBytesForDecimalPrecision(precision))
+ .named(name)
+ .withId(fieldId));
+ }
+
+ MessageType schema = new MessageType("paimon_schema", types);
+
+ List<Binary> decimalBytesList = new ArrayList<>();
+
+ BigDecimal decimalValue = new BigDecimal("123.67");
+ int scale = 2;
+ byte[] decimalBytes =
+ decimalValue.setScale(scale,
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+ Binary binaryValue = Binary.fromByteArray(decimalBytes);
+ decimalBytesList.add(binaryValue);
+
+ decimalValue = new BigDecimal("12345.67");
+ decimalBytes =
+ decimalValue.setScale(scale,
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+ binaryValue = Binary.fromByteArray(decimalBytes);
+ decimalBytesList.add(binaryValue);
+
+ decimalValue = new BigDecimal("1234567.67");
+ decimalBytes =
+ decimalValue.setScale(scale,
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+ binaryValue = Binary.fromByteArray(decimalBytes);
+ decimalBytesList.add(binaryValue);
+
+ decimalValue = new BigDecimal("123456789123.67");
+ decimalBytes =
+ decimalValue.setScale(scale,
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+ binaryValue = Binary.fromByteArray(decimalBytes);
+ decimalBytesList.add(binaryValue);
+
+ decimalValue = new BigDecimal("123456789123456789123456789123.67");
+ decimalBytes =
+ decimalValue.setScale(scale,
RoundingMode.HALF_UP).unscaledValue().toByteArray();
+ binaryValue = Binary.fromByteArray(decimalBytes);
+ decimalBytesList.add(binaryValue);
+
+ try (ParquetWriter<Group> writer =
+ ExampleParquetWriter.builder(
+ HadoopOutputFile.fromPath(
+ new
org.apache.hadoop.fs.Path(path.toString()), conf))
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withConf(new Configuration())
+ .withType(schema)
+ .build()) {
+ SimpleGroupFactory simpleGroupFactory = new
SimpleGroupFactory(schema);
+ for (int i = 0; i < rowNum; i++) {
+
+ Group row = simpleGroupFactory.newGroup();
+
+ for (int j = 0; j < DECIMAL_TYPE.getFields().size(); j++) {
+ row.append("f" + j, decimalBytesList.get(j));
+ }
+
+ writer.write(row);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException("Create data by parquet origin writer
failed.", e);
+ }
+ return path;
+ }
+
private void createParquetDoubleNestedArray(Group group, int i) {
Group outside = group.addGroup(0);
Group inside = outside.addGroup(0);