This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d83c5caf81 [parquet] Fix timestamp type and decimal type, if the file 
schema is not correctly match the schema in metadata (#5582)
d83c5caf81 is described below

commit d83c5caf81c4fdf9bd8d33e21ff0819e197581b7
Author: YeJunHao <[email protected]>
AuthorDate: Fri May 9 16:17:55 2025 +0800

    [parquet] Fix timestamp type and decimal type, if the file schema is not 
correctly match the schema in metadata (#5582)
---
 .../parquet/reader/ParquetSplitReaderUtil.java     |  26 ----
 .../reader/ParquetVectorUpdaterFactory.java        | 165 ++++++++++++++++-----
 .../parquet/reader/VectorizedColumnReader.java     |  34 ++++-
 .../reader/FileTypeNotMatchReadTypeTest.java       | 139 +++++++++++++++++
 4 files changed, 301 insertions(+), 63 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
index 1590b4b67e..afe8546470 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -60,7 +60,6 @@ import org.apache.parquet.io.PrimitiveColumnIO;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
-import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
@@ -149,10 +148,6 @@ public class ParquetSplitReaderUtil {
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 int precision = DataTypeChecks.getPrecision(fieldType);
                 if (precision > 6) {
-                    checkArgument(
-                            typeName == PrimitiveType.PrimitiveTypeName.INT96,
-                            "Unexpected type: %s",
-                            typeName);
                     return new HeapTimestampVector(batchSize);
                 } else {
                     return new HeapLongVector(batchSize);
@@ -160,31 +155,10 @@ public class ParquetSplitReaderUtil {
             case DECIMAL:
                 DecimalType decimalType = (DecimalType) fieldType;
                 if 
(ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) {
-                    checkArgument(
-                            (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-                                            || typeName == 
PrimitiveType.PrimitiveTypeName.INT32)
-                                    && primitiveType.getLogicalTypeAnnotation()
-                                            instanceof 
DecimalLogicalTypeAnnotation,
-                            "Unexpected type: %s",
-                            typeName);
                     return new HeapIntVector(batchSize);
                 } else if 
(ParquetSchemaConverter.is64BitDecimal(decimalType.getPrecision())) {
-                    checkArgument(
-                            (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-                                            || typeName == 
PrimitiveType.PrimitiveTypeName.INT64)
-                                    && primitiveType.getLogicalTypeAnnotation()
-                                            instanceof 
DecimalLogicalTypeAnnotation,
-                            "Unexpected type: %s",
-                            typeName);
                     return new HeapLongVector(batchSize);
                 } else {
-                    checkArgument(
-                            (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-                                            || typeName == 
PrimitiveType.PrimitiveTypeName.BINARY)
-                                    && primitiveType.getLogicalTypeAnnotation()
-                                            instanceof 
DecimalLogicalTypeAnnotation,
-                            "Unexpected type: %s",
-                            typeName);
                     return new HeapBytesVector(batchSize);
                 }
             case ARRAY:
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
index 6fa07c3e6d..94f03f66a9 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetVectorUpdaterFactory.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet.reader;
 
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
 import org.apache.paimon.data.columnar.heap.HeapIntVector;
 import org.apache.paimon.data.columnar.heap.HeapLongVector;
 import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
@@ -187,9 +188,14 @@ public class ParquetVectorUpdaterFactory {
             return c -> {
                 if (c.getPrimitiveType().getPrimitiveTypeName()
                         == PrimitiveType.PrimitiveTypeName.INT64) {
-                    return new LongUpdater();
+                    return new 
LongTimestampUpdater(timestampType.getPrecision());
+                } else if (c.getPrimitiveType().getPrimitiveTypeName()
+                        == PrimitiveType.PrimitiveTypeName.INT96) {
+                    return new TimestampUpdater(timestampType.getPrecision());
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Only support timestamp with int64 and int96 in 
parquet file yet");
                 }
-                return new TimestampUpdater();
             };
         }
 
@@ -200,7 +206,7 @@ public class ParquetVectorUpdaterFactory {
                         == PrimitiveType.PrimitiveTypeName.INT64) {
                     return new LongUpdater();
                 }
-                return new TimestampUpdater();
+                return new 
TimestampUpdater(localZonedTimestampType.getPrecision());
             };
         }
 
@@ -387,25 +393,86 @@ public class ParquetVectorUpdaterFactory {
         }
     }
 
-    private static class TimestampUpdater implements 
ParquetVectorUpdater<WritableTimestampVector> {
+    private abstract static class AbstractTimestampUpdater
+            implements ParquetVectorUpdater<WritableColumnVector> {
+
+        protected final int precision;
+
+        AbstractTimestampUpdater(int precision) {
+            this.precision = precision;
+        }
+
+        @Override
+        public void readValues(
+                int total,
+                int offset,
+                WritableColumnVector values,
+                VectorizedValuesReader valuesReader) {
+            for (int i = 0; i < total; i++) {
+                readValue(offset + i, values, valuesReader);
+            }
+        }
+    }
+
+    private static class LongTimestampUpdater extends AbstractTimestampUpdater 
{
+
+        public LongTimestampUpdater(int precision) {
+            super(precision);
+        }
+
+        @Override
+        public void skipValues(int total, VectorizedValuesReader valuesReader) 
{
+            valuesReader.skipLongs(total);
+        }
+
+        @Override
+        public void readValue(
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
+            long value = valuesReader.readLong();
+            putTimestamp(values, offset, value);
+        }
+
+        @Override
+        public void decodeSingleDictionaryId(
+                int offset,
+                WritableColumnVector values,
+                WritableIntVector dictionaryIds,
+                Dictionary dictionary) {
+            long value = dictionary.decodeToLong(dictionaryIds.getInt(offset));
+            putTimestamp(values, offset, value);
+        }
+
+        private void putTimestamp(WritableColumnVector vector, int offset, 
long timestamp) {
+            if (vector instanceof WritableTimestampVector) {
+                ((WritableTimestampVector) vector)
+                        .setTimestamp(offset, 
Timestamp.fromEpochMillis(timestamp));
+            } else {
+                ((WritableLongVector) vector).setLong(offset, timestamp);
+            }
+        }
+    }
+
+    private static class TimestampUpdater extends AbstractTimestampUpdater {
 
         public static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
         public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
         public static final long NANOS_PER_MILLISECOND = 
TimeUnit.MILLISECONDS.toNanos(1);
         public static final long NANOS_PER_SECOND = 
TimeUnit.SECONDS.toNanos(1);
 
+        public TimestampUpdater(int precision) {
+            super(precision);
+        }
+
         @Override
         public void readValues(
                 int total,
                 int offset,
-                WritableTimestampVector values,
+                WritableColumnVector values,
                 VectorizedValuesReader valuesReader) {
-
             for (int i = 0; i < total; i++) {
-                values.setTimestamp(
-                        offset + i,
-                        int96ToTimestamp(
-                                true, valuesReader.readLong(), 
valuesReader.readInteger()));
+                Timestamp timestamp =
+                        int96ToTimestamp(true, valuesReader.readLong(), 
valuesReader.readInteger());
+                putTimestamp(values, offset + i, timestamp);
             }
         }
 
@@ -416,8 +483,9 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableTimestampVector values, 
VectorizedValuesReader valuesReader) {
-            values.setTimestamp(
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
+            putTimestamp(
+                    values,
                     offset,
                     int96ToTimestamp(true, valuesReader.readLong(), 
valuesReader.readInteger()));
         }
@@ -425,11 +493,28 @@ public class ParquetVectorUpdaterFactory {
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableTimestampVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
-            values.setTimestamp(
-                    offset, decodeInt96ToTimestamp(true, dictionary, 
dictionaryIds.getInt(offset)));
+            putTimestamp(
+                    values,
+                    offset,
+                    decodeInt96ToTimestamp(true, dictionary, 
dictionaryIds.getInt(offset)));
+        }
+
+        private void putTimestamp(WritableColumnVector vector, int offset, 
Timestamp timestamp) {
+            if (vector instanceof WritableTimestampVector) {
+                ((WritableTimestampVector) vector).setTimestamp(offset, 
timestamp);
+            } else {
+                if (precision <= 3) {
+                    ((WritableLongVector) vector).setLong(offset, 
timestamp.getMillisecond());
+                } else if (precision <= 6) {
+                    ((WritableLongVector) vector).setLong(offset, 
timestamp.toMicros());
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported timestamp precision: " + precision);
+                }
+            }
         }
 
         public static Timestamp decodeInt96ToTimestamp(
@@ -611,9 +696,21 @@ public class ParquetVectorUpdaterFactory {
                 readValue(offset + i, values, valuesReader);
             }
         }
+
+        protected void putDecimal(WritableColumnVector values, int offset, 
BigDecimal decimal) {
+            int precision = paimonType.getPrecision();
+            if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+                ((HeapIntVector) values).setInt(offset, 
decimal.unscaledValue().intValue());
+            } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+                ((HeapLongVector) values).setLong(offset, 
decimal.unscaledValue().longValue());
+            } else {
+                byte[] bytes = decimal.unscaledValue().toByteArray();
+                ((WritableBytesVector) values).putByteArray(offset, bytes, 0, 
bytes.length);
+            }
+        }
     }
 
-    private static class IntegerToDecimalUpdater extends 
DecimalUpdater<WritableIntVector> {
+    private static class IntegerToDecimalUpdater extends 
DecimalUpdater<WritableColumnVector> {
         private final int parquetScale;
 
         IntegerToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
paimonType) {
@@ -634,25 +731,25 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableIntVector values, VectorizedValuesReader 
valuesReader) {
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
             BigDecimal decimal = 
BigDecimal.valueOf(valuesReader.readInteger(), parquetScale);
-            values.setInt(offset, decimal.unscaledValue().intValue());
+            putDecimal(values, offset, decimal);
         }
 
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableIntVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
             BigDecimal decimal =
                     BigDecimal.valueOf(
                             
dictionary.decodeToInt(dictionaryIds.getInt(offset)), parquetScale);
-            values.setInt(offset, decimal.unscaledValue().intValue());
+            putDecimal(values, offset, decimal);
         }
     }
 
-    private static class LongToDecimalUpdater extends 
DecimalUpdater<WritableLongVector> {
+    private static class LongToDecimalUpdater extends 
DecimalUpdater<WritableColumnVector> {
         private final int parquetScale;
 
         LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
paimonType) {
@@ -673,32 +770,34 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableLongVector values, VectorizedValuesReader 
valuesReader) {
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
             BigDecimal decimal = BigDecimal.valueOf(valuesReader.readLong(), 
parquetScale);
-            values.setLong(offset, decimal.unscaledValue().longValue());
+            putDecimal(values, offset, decimal);
         }
 
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableLongVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
             BigDecimal decimal =
                     BigDecimal.valueOf(
                             
dictionary.decodeToLong(dictionaryIds.getInt(offset)), parquetScale);
-            values.setLong(offset, decimal.unscaledValue().longValue());
+            putDecimal(values, offset, decimal);
         }
     }
 
-    private static class BinaryToDecimalUpdater extends 
DecimalUpdater<WritableBytesVector> {
+    private static class BinaryToDecimalUpdater extends 
DecimalUpdater<WritableColumnVector> {
         private final int parquetScale;
+        private final WritableBytesVector bytesVector;
 
         BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
paimonType) {
             super(paimonType);
             LogicalTypeAnnotation typeAnnotation =
                     descriptor.getPrimitiveType().getLogicalTypeAnnotation();
             this.parquetScale = ((DecimalLogicalTypeAnnotation) 
typeAnnotation).getScale();
+            this.bytesVector = new HeapBytesVector(1);
         }
 
         @Override
@@ -708,18 +807,17 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableBytesVector values, VectorizedValuesReader 
valuesReader) {
-            valuesReader.readBinary(1, values, offset);
-            BigInteger value = new 
BigInteger(values.getBytes(offset).getBytes());
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
+            valuesReader.readBinary(1, bytesVector, offset);
+            BigInteger value = new 
BigInteger(bytesVector.getBytes(offset).getBytes());
             BigDecimal decimal = new BigDecimal(value, parquetScale);
-            byte[] bytes = decimal.unscaledValue().toByteArray();
-            values.putByteArray(offset, bytes, 0, bytes.length);
+            putDecimal(values, offset, decimal);
         }
 
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableBytesVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
             BigInteger value =
@@ -728,8 +826,7 @@ public class ParquetVectorUpdaterFactory {
                                     
.decodeToBinary(dictionaryIds.getInt(offset))
                                     .getBytesUnsafe());
             BigDecimal decimal = new BigDecimal(value, parquetScale);
-            byte[] bytes = decimal.unscaledValue().toByteArray();
-            values.putByteArray(offset, bytes, 0, bytes.length);
+            putDecimal(values, offset, decimal);
         }
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java
index 2a2d33975a..0f2713f58b 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/VectorizedColumnReader.java
@@ -18,6 +18,13 @@
 
 package org.apache.paimon.format.parquet.reader;
 
+import org.apache.paimon.data.columnar.BooleanColumnVector;
+import org.apache.paimon.data.columnar.BytesColumnVector;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.DoubleColumnVector;
+import org.apache.paimon.data.columnar.FloatColumnVector;
+import org.apache.paimon.data.columnar.IntColumnVector;
+import org.apache.paimon.data.columnar.LongColumnVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 import org.apache.paimon.types.DataType;
@@ -42,6 +49,7 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
 
+import static org.apache.paimon.types.DataTypeRoot.FLOAT;
 import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 
 /** Decoder to return values from a single column. */
@@ -108,8 +116,28 @@ public class VectorizedColumnReader {
     }
 
     private boolean isLazyDecodingSupported(
-            PrimitiveType.PrimitiveTypeName typeName, DataType paimonType) {
-        return true;
+            PrimitiveType.PrimitiveTypeName typeName, ColumnVector 
columnVector) {
+        boolean isSupported = false;
+        switch (typeName) {
+            case INT32:
+                isSupported = columnVector instanceof IntColumnVector;
+                break;
+            case INT64:
+                isSupported = columnVector instanceof LongColumnVector;
+                break;
+            case FLOAT:
+                isSupported = columnVector instanceof FloatColumnVector;
+                break;
+            case DOUBLE:
+                isSupported = columnVector instanceof DoubleColumnVector;
+                break;
+            case BOOLEAN:
+                isSupported = columnVector instanceof BooleanColumnVector;
+                break;
+            case BINARY:
+                isSupported = columnVector instanceof BytesColumnVector;
+        }
+        return isSupported;
     }
 
     /** Reads `total` rows from this columnReader into column. */
@@ -176,7 +204,7 @@ public class VectorizedColumnReader {
                 // the values to add microseconds precision.
                 if (column.hasDictionary()
                         || (startRowId == pageFirstRowIndex
-                                && isLazyDecodingSupported(typeName, type))) {
+                                && isLazyDecodingSupported(typeName, column))) 
{
                     column.setDictionary(new ParquetDictionary(dictionary));
                 } else {
                     updater.decodeDictionaryIds(
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
new file mode 100644
index 0000000000..8a3f28ae67
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.parquet.ParquetReaderFactory;
+import org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.FileRecordReader;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.io.LocalOutputFile;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.util.Random;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test field type not match correctly with read type. */
+public class FileTypeNotMatchReadTypeTest {
+
+    private static final Random RANDOM = new Random();
+    @TempDir private Path tempDir;
+
+    @Test
+    public void testTimestamp() throws Exception {
+        String fileName = "test.parquet";
+        String fileWholePath = tempDir + "/" + fileName;
+        for (int i = 0; i < 100; i++) {
+            int writePrecision = RANDOM.nextInt(10);
+            int readPrecision = writePrecision == 0 ? 0 : 
RANDOM.nextInt(writePrecision);
+
+            // precision 0-3 and 3-6 are all long type (INT64) in file.
+            // but precision 0-3 is TIMESTAMP_MICROS and 3-6 is 
TIMESTAMP_MILLIS in file.
+            // so we need to set readPrecision to 4 if writePrecision is 4-6.
+            if (readPrecision <= 3 && writePrecision > 3) {
+                readPrecision = 4;
+            }
+
+            RowType rowTypeWrite = 
RowType.of(DataTypes.TIMESTAMP(writePrecision));
+            RowType rowTypeRead = 
RowType.of(DataTypes.TIMESTAMP(readPrecision));
+
+            ParquetRowDataBuilder parquetRowDataBuilder =
+                    new ParquetRowDataBuilder(
+                            new LocalOutputFile(new 
File(fileWholePath).toPath()), rowTypeWrite);
+
+            ParquetWriter<InternalRow> parquetWriter = 
parquetRowDataBuilder.build();
+            Timestamp timestamp = Timestamp.now();
+            parquetWriter.write(GenericRow.of(timestamp));
+            parquetWriter.write(GenericRow.of(Timestamp.now()));
+            parquetWriter.close();
+
+            ParquetReaderFactory parquetReaderFactory =
+                    new ParquetReaderFactory(new Options(), rowTypeRead, 100, 
null);
+
+            File file = new File(fileWholePath);
+            FileRecordReader<InternalRow> fileRecordReader =
+                    parquetReaderFactory.createReader(
+                            new FormatReaderContext(
+                                    LocalFileIO.create(),
+                                    new 
org.apache.paimon.fs.Path(tempDir.toString(), fileName),
+                                    file.length()));
+
+            InternalRow row = fileRecordReader.readBatch().next();
+            Timestamp getTimestamp = row.getTimestamp(0, readPrecision);
+            
assertThat(timestamp.getMillisecond()).isEqualTo(getTimestamp.getMillisecond());
+            file.delete();
+        }
+    }
+
+    @Test
+    public void testDecimal() throws Exception {
+        String fileName = "test.parquet";
+        String fileWholePath = tempDir + "/" + fileName;
+        for (int i = 0; i < 100; i++) {
+            int writePrecision = 1 + RANDOM.nextInt(30);
+            int readPrecision = 1 + RANDOM.nextInt(writePrecision);
+
+            RowType rowTypeWrite = 
RowType.of(DataTypes.DECIMAL(writePrecision, 0));
+            RowType rowTypeRead = RowType.of(DataTypes.DECIMAL(readPrecision, 
0));
+
+            ParquetRowDataBuilder parquetRowDataBuilder =
+                    new ParquetRowDataBuilder(
+                            new LocalOutputFile(new 
File(fileWholePath).toPath()), rowTypeWrite);
+
+            ParquetWriter<InternalRow> parquetWriter = 
parquetRowDataBuilder.build();
+            Decimal decimal =
+                    Decimal.fromBigDecimal(new java.math.BigDecimal(1.0), 
writePrecision, 0);
+            parquetWriter.write(GenericRow.of(decimal));
+            parquetWriter.write(
+                    GenericRow.of(
+                            Decimal.fromBigDecimal(
+                                    new java.math.BigDecimal(2.0), 
writePrecision, 0)));
+            parquetWriter.close();
+
+            ParquetReaderFactory parquetReaderFactory =
+                    new ParquetReaderFactory(new Options(), rowTypeRead, 100, 
null);
+
+            File file = new File(fileWholePath);
+            FileRecordReader<InternalRow> fileRecordReader =
+                    parquetReaderFactory.createReader(
+                            new FormatReaderContext(
+                                    LocalFileIO.create(),
+                                    new 
org.apache.paimon.fs.Path(tempDir.toString(), fileName),
+                                    file.length()));
+
+            InternalRow row = fileRecordReader.readBatch().next();
+            Decimal getDecimal = row.getDecimal(0, readPrecision, 0);
+            
assertThat(decimal.toUnscaledLong()).isEqualTo(getDecimal.toUnscaledLong());
+            file.delete();
+        }
+    }
+}

Reply via email to