This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d83c5caf81 [parquet] Fix timestamp type and decimal type, if the file
schema is not correctly match the schema in metadata (#5582)
d83c5caf81 is described below
commit d83c5caf81c4fdf9bd8d33e21ff0819e197581b7
Author: YeJunHao <[email protected]>
AuthorDate: Fri May 9 16:17:55 2025 +0800
[parquet] Fix timestamp type and decimal type, if the file schema is not
correctly match the schema in metadata (#5582)
---
.../parquet/reader/ParquetSplitReaderUtil.java | 26 ----
.../reader/ParquetVectorUpdaterFactory.java | 165 ++++++++++++++++-----
.../parquet/reader/VectorizedColumnReader.java | 34 ++++-
.../reader/FileTypeNotMatchReadTypeTest.java | 139 +++++++++++++++++
4 files changed, 301 insertions(+), 63 deletions(-)
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
index 1590b4b67e..afe8546470 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -60,7 +60,6 @@ import org.apache.parquet.io.PrimitiveColumnIO;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.InvalidSchemaException;
import org.apache.parquet.schema.LogicalTypeAnnotation;
-import
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
@@ -149,10 +148,6 @@ public class ParquetSplitReaderUtil {
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
int precision = DataTypeChecks.getPrecision(fieldType);
if (precision > 6) {
- checkArgument(
- typeName == PrimitiveType.PrimitiveTypeName.INT96,
- "Unexpected type: %s",
- typeName);
return new HeapTimestampVector(batchSize);
} else {
return new HeapLongVector(batchSize);
@@ -160,31 +155,10 @@ public class ParquetSplitReaderUtil {
case DECIMAL:
DecimalType decimalType = (DecimalType) fieldType;
if
(ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) {
- checkArgument(
- (typeName ==
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
- || typeName ==
PrimitiveType.PrimitiveTypeName.INT32)
- && primitiveType.getLogicalTypeAnnotation()
- instanceof
DecimalLogicalTypeAnnotation,
- "Unexpected type: %s",
- typeName);
return new HeapIntVector(batchSize);
} else if
(ParquetSchemaConverter.is64BitDecimal(decimalType.getPrecision())) {
- checkArgument(
- (typeName ==
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
- || typeName ==
PrimitiveType.PrimitiveTypeName.INT64)
- && primitiveType.getLogicalTypeAnnotation()
- instanceof
DecimalLogicalTypeAnnotation,
- "Unexpected type: %s",
- typeName);
return new HeapLongVector(batchSize);
} else {
- checkArgument(
- (typeName ==
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
- || typeName ==
PrimitiveType.PrimitiveTypeName.BINARY)
- && primitiveType.getLogicalTypeAnnotation()
- instanceof
DecimalLogicalTypeAnnotation,
- "Unexpected type: %s",
- typeName);
return new HeapBytesVector(batchSize);
}
case ARRAY:
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
index 6fa07c3e6d..94f03f66a9 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
@@ -19,6 +19,7 @@
package org.apache.paimon.format.parquet.reader;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
import org.apache.paimon.data.columnar.heap.HeapIntVector;
import org.apache.paimon.data.columnar.heap.HeapLongVector;
import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
@@ -187,9 +188,14 @@ public class ParquetVectorUpdaterFactory {
return c -> {
if (c.getPrimitiveType().getPrimitiveTypeName()
== PrimitiveType.PrimitiveTypeName.INT64) {
- return new LongUpdater();
+ return new
LongTimestampUpdater(timestampType.getPrecision());
+ } else if (c.getPrimitiveType().getPrimitiveTypeName()
+ == PrimitiveType.PrimitiveTypeName.INT96) {
+ return new TimestampUpdater(timestampType.getPrecision());
+ } else {
+ throw new UnsupportedOperationException(
+ "Only support timestamp with int64 and int96 in
parquet file yet");
}
- return new TimestampUpdater();
};
}
@@ -200,7 +206,7 @@ public class ParquetVectorUpdaterFactory {
== PrimitiveType.PrimitiveTypeName.INT64) {
return new LongUpdater();
}
- return new TimestampUpdater();
+ return new
TimestampUpdater(localZonedTimestampType.getPrecision());
};
}
@@ -387,25 +393,86 @@ public class ParquetVectorUpdaterFactory {
}
}
- private static class TimestampUpdater implements
ParquetVectorUpdater<WritableTimestampVector> {
+ private abstract static class AbstractTimestampUpdater
+ implements ParquetVectorUpdater<WritableColumnVector> {
+
+ protected final int precision;
+
+ AbstractTimestampUpdater(int precision) {
+ this.precision = precision;
+ }
+
+ @Override
+ public void readValues(
+ int total,
+ int offset,
+ WritableColumnVector values,
+ VectorizedValuesReader valuesReader) {
+ for (int i = 0; i < total; i++) {
+ readValue(offset + i, values, valuesReader);
+ }
+ }
+ }
+
+ private static class LongTimestampUpdater extends AbstractTimestampUpdater
{
+
+ public LongTimestampUpdater(int precision) {
+ super(precision);
+ }
+
+ @Override
+ public void skipValues(int total, VectorizedValuesReader valuesReader)
{
+ valuesReader.skipLongs(total);
+ }
+
+ @Override
+ public void readValue(
+ int offset, WritableColumnVector values,
VectorizedValuesReader valuesReader) {
+ long value = valuesReader.readLong();
+ putTimestamp(values, offset, value);
+ }
+
+ @Override
+ public void decodeSingleDictionaryId(
+ int offset,
+ WritableColumnVector values,
+ WritableIntVector dictionaryIds,
+ Dictionary dictionary) {
+ long value = dictionary.decodeToLong(dictionaryIds.getInt(offset));
+ putTimestamp(values, offset, value);
+ }
+
+ private void putTimestamp(WritableColumnVector vector, int offset,
long timestamp) {
+ if (vector instanceof WritableTimestampVector) {
+ ((WritableTimestampVector) vector)
+ .setTimestamp(offset,
Timestamp.fromEpochMillis(timestamp));
+ } else {
+ ((WritableLongVector) vector).setLong(offset, timestamp);
+ }
+ }
+ }
+
+ private static class TimestampUpdater extends AbstractTimestampUpdater {
public static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
public static final long NANOS_PER_MILLISECOND =
TimeUnit.MILLISECONDS.toNanos(1);
public static final long NANOS_PER_SECOND =
TimeUnit.SECONDS.toNanos(1);
+ public TimestampUpdater(int precision) {
+ super(precision);
+ }
+
@Override
public void readValues(
int total,
int offset,
- WritableTimestampVector values,
+ WritableColumnVector values,
VectorizedValuesReader valuesReader) {
-
for (int i = 0; i < total; i++) {
- values.setTimestamp(
- offset + i,
- int96ToTimestamp(
- true, valuesReader.readLong(),
valuesReader.readInteger()));
+ Timestamp timestamp =
+ int96ToTimestamp(true, valuesReader.readLong(),
valuesReader.readInteger());
+ putTimestamp(values, offset + i, timestamp);
}
}
@@ -416,8 +483,9 @@ public class ParquetVectorUpdaterFactory {
@Override
public void readValue(
- int offset, WritableTimestampVector values,
VectorizedValuesReader valuesReader) {
- values.setTimestamp(
+ int offset, WritableColumnVector values,
VectorizedValuesReader valuesReader) {
+ putTimestamp(
+ values,
offset,
int96ToTimestamp(true, valuesReader.readLong(),
valuesReader.readInteger()));
}
@@ -425,11 +493,28 @@ public class ParquetVectorUpdaterFactory {
@Override
public void decodeSingleDictionaryId(
int offset,
- WritableTimestampVector values,
+ WritableColumnVector values,
WritableIntVector dictionaryIds,
Dictionary dictionary) {
- values.setTimestamp(
- offset, decodeInt96ToTimestamp(true, dictionary,
dictionaryIds.getInt(offset)));
+ putTimestamp(
+ values,
+ offset,
+ decodeInt96ToTimestamp(true, dictionary,
dictionaryIds.getInt(offset)));
+ }
+
+ private void putTimestamp(WritableColumnVector vector, int offset,
Timestamp timestamp) {
+ if (vector instanceof WritableTimestampVector) {
+ ((WritableTimestampVector) vector).setTimestamp(offset,
timestamp);
+ } else {
+ if (precision <= 3) {
+ ((WritableLongVector) vector).setLong(offset,
timestamp.getMillisecond());
+ } else if (precision <= 6) {
+ ((WritableLongVector) vector).setLong(offset,
timestamp.toMicros());
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported timestamp precision: " + precision);
+ }
+ }
}
public static Timestamp decodeInt96ToTimestamp(
@@ -611,9 +696,21 @@ public class ParquetVectorUpdaterFactory {
readValue(offset + i, values, valuesReader);
}
}
+
+ protected void putDecimal(WritableColumnVector values, int offset,
BigDecimal decimal) {
+ int precision = paimonType.getPrecision();
+ if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+ ((HeapIntVector) values).setInt(offset,
decimal.unscaledValue().intValue());
+ } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+ ((HeapLongVector) values).setLong(offset,
decimal.unscaledValue().longValue());
+ } else {
+ byte[] bytes = decimal.unscaledValue().toByteArray();
+ ((WritableBytesVector) values).putByteArray(offset, bytes, 0,
bytes.length);
+ }
+ }
}
- private static class IntegerToDecimalUpdater extends
DecimalUpdater<WritableIntVector> {
+ private static class IntegerToDecimalUpdater extends
DecimalUpdater<WritableColumnVector> {
private final int parquetScale;
IntegerToDecimalUpdater(ColumnDescriptor descriptor, DecimalType
paimonType) {
@@ -634,25 +731,25 @@ public class ParquetVectorUpdaterFactory {
@Override
public void readValue(
- int offset, WritableIntVector values, VectorizedValuesReader
valuesReader) {
+ int offset, WritableColumnVector values,
VectorizedValuesReader valuesReader) {
BigDecimal decimal =
BigDecimal.valueOf(valuesReader.readInteger(), parquetScale);
- values.setInt(offset, decimal.unscaledValue().intValue());
+ putDecimal(values, offset, decimal);
}
@Override
public void decodeSingleDictionaryId(
int offset,
- WritableIntVector values,
+ WritableColumnVector values,
WritableIntVector dictionaryIds,
Dictionary dictionary) {
BigDecimal decimal =
BigDecimal.valueOf(
dictionary.decodeToInt(dictionaryIds.getInt(offset)), parquetScale);
- values.setInt(offset, decimal.unscaledValue().intValue());
+ putDecimal(values, offset, decimal);
}
}
- private static class LongToDecimalUpdater extends
DecimalUpdater<WritableLongVector> {
+ private static class LongToDecimalUpdater extends
DecimalUpdater<WritableColumnVector> {
private final int parquetScale;
LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType
paimonType) {
@@ -673,32 +770,34 @@ public class ParquetVectorUpdaterFactory {
@Override
public void readValue(
- int offset, WritableLongVector values, VectorizedValuesReader
valuesReader) {
+ int offset, WritableColumnVector values,
VectorizedValuesReader valuesReader) {
BigDecimal decimal = BigDecimal.valueOf(valuesReader.readLong(),
parquetScale);
- values.setLong(offset, decimal.unscaledValue().longValue());
+ putDecimal(values, offset, decimal);
}
@Override
public void decodeSingleDictionaryId(
int offset,
- WritableLongVector values,
+ WritableColumnVector values,
WritableIntVector dictionaryIds,
Dictionary dictionary) {
BigDecimal decimal =
BigDecimal.valueOf(
dictionary.decodeToLong(dictionaryIds.getInt(offset)), parquetScale);
- values.setLong(offset, decimal.unscaledValue().longValue());
+ putDecimal(values, offset, decimal);
}
}
- private static class BinaryToDecimalUpdater extends
DecimalUpdater<WritableBytesVector> {
+ private static class BinaryToDecimalUpdater extends
DecimalUpdater<WritableColumnVector> {
private final int parquetScale;
+ private final WritableBytesVector bytesVector;
BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType
paimonType) {
super(paimonType);
LogicalTypeAnnotation typeAnnotation =
descriptor.getPrimitiveType().getLogicalTypeAnnotation();
this.parquetScale = ((DecimalLogicalTypeAnnotation)
typeAnnotation).getScale();
+ this.bytesVector = new HeapBytesVector(1);
}
@Override
@@ -708,18 +807,17 @@ public class ParquetVectorUpdaterFactory {
@Override
public void readValue(
- int offset, WritableBytesVector values, VectorizedValuesReader
valuesReader) {
- valuesReader.readBinary(1, values, offset);
- BigInteger value = new
BigInteger(values.getBytes(offset).getBytes());
+ int offset, WritableColumnVector values,
VectorizedValuesReader valuesReader) {
+ valuesReader.readBinary(1, bytesVector, offset);
+ BigInteger value = new
BigInteger(bytesVector.getBytes(offset).getBytes());
BigDecimal decimal = new BigDecimal(value, parquetScale);
- byte[] bytes = decimal.unscaledValue().toByteArray();
- values.putByteArray(offset, bytes, 0, bytes.length);
+ putDecimal(values, offset, decimal);
}
@Override
public void decodeSingleDictionaryId(
int offset,
- WritableBytesVector values,
+ WritableColumnVector values,
WritableIntVector dictionaryIds,
Dictionary dictionary) {
BigInteger value =
@@ -728,8 +826,7 @@ public class ParquetVectorUpdaterFactory {
.decodeToBinary(dictionaryIds.getInt(offset))
.getBytesUnsafe());
BigDecimal decimal = new BigDecimal(value, parquetScale);
- byte[] bytes = decimal.unscaledValue().toByteArray();
- values.putByteArray(offset, bytes, 0, bytes.length);
+ putDecimal(values, offset, decimal);
}
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java
index 2a2d33975a..0f2713f58b 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java
@@ -18,6 +18,13 @@
package org.apache.paimon.format.parquet.reader;
+import org.apache.paimon.data.columnar.BooleanColumnVector;
+import org.apache.paimon.data.columnar.BytesColumnVector;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.DoubleColumnVector;
+import org.apache.paimon.data.columnar.FloatColumnVector;
+import org.apache.paimon.data.columnar.IntColumnVector;
+import org.apache.paimon.data.columnar.LongColumnVector;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.types.DataType;
@@ -42,6 +49,7 @@ import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
+import static org.apache.paimon.types.DataTypeRoot.FLOAT;
import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
/** Decoder to return values from a single column. */
@@ -108,8 +116,28 @@ public class VectorizedColumnReader {
}
private boolean isLazyDecodingSupported(
- PrimitiveType.PrimitiveTypeName typeName, DataType paimonType) {
- return true;
+ PrimitiveType.PrimitiveTypeName typeName, ColumnVector
columnVector) {
+ boolean isSupported = false;
+ switch (typeName) {
+ case INT32:
+ isSupported = columnVector instanceof IntColumnVector;
+ break;
+ case INT64:
+ isSupported = columnVector instanceof LongColumnVector;
+ break;
+ case FLOAT:
+ isSupported = columnVector instanceof FloatColumnVector;
+ break;
+ case DOUBLE:
+ isSupported = columnVector instanceof DoubleColumnVector;
+ break;
+ case BOOLEAN:
+ isSupported = columnVector instanceof BooleanColumnVector;
+ break;
+ case BINARY:
+ isSupported = columnVector instanceof BytesColumnVector;
+ }
+ return isSupported;
}
/** Reads `total` rows from this columnReader into column. */
@@ -176,7 +204,7 @@ public class VectorizedColumnReader {
// the values to add microseconds precision.
if (column.hasDictionary()
|| (startRowId == pageFirstRowIndex
- && isLazyDecodingSupported(typeName, type))) {
+ && isLazyDecodingSupported(typeName, column)))
{
column.setDictionary(new ParquetDictionary(dictionary));
} else {
updater.decodeDictionaryIds(
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
new file mode 100644
index 0000000000..8a3f28ae67
--- /dev/null
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.parquet.ParquetReaderFactory;
+import org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.LocalOutputFile;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test field type not match correctly with read type. */
+public class FileTypeNotMatchReadTypeTest {
+
+ private static final Random RANDOM = new Random();
+ @TempDir private Path tempDir;
+
+ @Test
+ public void testTimestamp() throws Exception {
+ String fileName = "test.parquet";
+ String fileWholePath = tempDir + "/" + fileName;
+ for (int i = 0; i < 100; i++) {
+ int writePrecision = RANDOM.nextInt(10);
+ int readPrecision = writePrecision == 0 ? 0 :
RANDOM.nextInt(writePrecision);
+
+ // precision 0-3 and 3-6 are all long type (INT64) in file.
+ // but precision 0-3 is TIMESTAMP_MICROS and 3-6 is
TIMESTAMP_MILLIS in file.
+ // so we need to set readPrecision to 4 if writePrecision is 4-6.
+ if (readPrecision <= 3 && writePrecision > 3) {
+ readPrecision = 4;
+ }
+
+ RowType rowTypeWrite =
RowType.of(DataTypes.TIMESTAMP(writePrecision));
+ RowType rowTypeRead =
RowType.of(DataTypes.TIMESTAMP(readPrecision));
+
+ ParquetRowDataBuilder parquetRowDataBuilder =
+ new ParquetRowDataBuilder(
+ new LocalOutputFile(new
File(fileWholePath).toPath()), rowTypeWrite);
+
+ ParquetWriter<InternalRow> parquetWriter =
parquetRowDataBuilder.build();
+ Timestamp timestamp = Timestamp.now();
+ parquetWriter.write(GenericRow.of(timestamp));
+ parquetWriter.write(GenericRow.of(Timestamp.now()));
+ parquetWriter.close();
+
+ ParquetReaderFactory parquetReaderFactory =
+ new ParquetReaderFactory(new Options(), rowTypeRead, 100,
null);
+
+ File file = new File(fileWholePath);
+ FileRecordReader<InternalRow> fileRecordReader =
+ parquetReaderFactory.createReader(
+ new FormatReaderContext(
+ LocalFileIO.create(),
+ new
org.apache.paimon.fs.Path(tempDir.toString(), fileName),
+ file.length()));
+
+ InternalRow row = fileRecordReader.readBatch().next();
+ Timestamp getTimestamp = row.getTimestamp(0, readPrecision);
+
assertThat(timestamp.getMillisecond()).isEqualTo(getTimestamp.getMillisecond());
+ file.delete();
+ }
+ }
+
+ @Test
+ public void testDecimal() throws Exception {
+ String fileName = "test.parquet";
+ String fileWholePath = tempDir + "/" + fileName;
+ for (int i = 0; i < 100; i++) {
+ int writePrecision = 1 + RANDOM.nextInt(30);
+ int readPrecision = 1 + RANDOM.nextInt(writePrecision);
+
+ RowType rowTypeWrite =
RowType.of(DataTypes.DECIMAL(writePrecision, 0));
+ RowType rowTypeRead = RowType.of(DataTypes.DECIMAL(readPrecision,
0));
+
+ ParquetRowDataBuilder parquetRowDataBuilder =
+ new ParquetRowDataBuilder(
+ new LocalOutputFile(new
File(fileWholePath).toPath()), rowTypeWrite);
+
+ ParquetWriter<InternalRow> parquetWriter =
parquetRowDataBuilder.build();
+ Decimal decimal =
+ Decimal.fromBigDecimal(new java.math.BigDecimal(1.0),
writePrecision, 0);
+ parquetWriter.write(GenericRow.of(decimal));
+ parquetWriter.write(
+ GenericRow.of(
+ Decimal.fromBigDecimal(
+ new java.math.BigDecimal(2.0),
writePrecision, 0)));
+ parquetWriter.close();
+
+ ParquetReaderFactory parquetReaderFactory =
+ new ParquetReaderFactory(new Options(), rowTypeRead, 100,
null);
+
+ File file = new File(fileWholePath);
+ FileRecordReader<InternalRow> fileRecordReader =
+ parquetReaderFactory.createReader(
+ new FormatReaderContext(
+ LocalFileIO.create(),
+ new
org.apache.paimon.fs.Path(tempDir.toString(), fileName),
+ file.length()));
+
+ InternalRow row = fileRecordReader.readBatch().next();
+ Decimal getDecimal = row.getDecimal(0, readPrecision, 0);
+
assertThat(decimal.toUnscaledLong()).isEqualTo(getDecimal.toUnscaledLong());
+ file.delete();
+ }
+ }
+}