This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.1
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 403c6b705c28825d570a487b64ff9c6eb4374a64
Author: YeJunHao <[email protected]>
AuthorDate: Fri May 9 16:17:55 2025 +0800

    [parquet] Fix timestamp type and decimal type, if the file schema is not 
correctly match the schema in metadata (#5582)
---
 .../newreader/ParquetVectorUpdaterFactory.java     | 165 ++++++++++++++++-----
 .../parquet/newreader/VectorizedColumnReader.java  |  34 ++++-
 .../parquet/reader/ParquetSplitReaderUtil.java     |  26 ----
 3 files changed, 162 insertions(+), 63 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
index 465d8824d4..e420c450bc 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/ParquetVectorUpdaterFactory.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet.newreader;
 
 import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
 import org.apache.paimon.data.columnar.heap.HeapIntVector;
 import org.apache.paimon.data.columnar.heap.HeapLongVector;
 import org.apache.paimon.data.columnar.writable.WritableBooleanVector;
@@ -191,9 +192,14 @@ public class ParquetVectorUpdaterFactory {
             return c -> {
                 if (c.getPrimitiveType().getPrimitiveTypeName()
                         == PrimitiveType.PrimitiveTypeName.INT64) {
-                    return new LongUpdater();
+                    return new 
LongTimestampUpdater(timestampType.getPrecision());
+                } else if (c.getPrimitiveType().getPrimitiveTypeName()
+                        == PrimitiveType.PrimitiveTypeName.INT96) {
+                    return new TimestampUpdater(timestampType.getPrecision());
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Only support timestamp with int64 and int96 in 
parquet file yet");
                 }
-                return new TimestampUpdater();
             };
         }
 
@@ -204,7 +210,7 @@ public class ParquetVectorUpdaterFactory {
                         == PrimitiveType.PrimitiveTypeName.INT64) {
                     return new LongUpdater();
                 }
-                return new TimestampUpdater();
+                return new 
TimestampUpdater(localZonedTimestampType.getPrecision());
             };
         }
 
@@ -391,25 +397,86 @@ public class ParquetVectorUpdaterFactory {
         }
     }
 
-    private static class TimestampUpdater implements 
ParquetVectorUpdater<WritableTimestampVector> {
+    private abstract static class AbstractTimestampUpdater
+            implements ParquetVectorUpdater<WritableColumnVector> {
+
+        protected final int precision;
+
+        AbstractTimestampUpdater(int precision) {
+            this.precision = precision;
+        }
+
+        @Override
+        public void readValues(
+                int total,
+                int offset,
+                WritableColumnVector values,
+                VectorizedValuesReader valuesReader) {
+            for (int i = 0; i < total; i++) {
+                readValue(offset + i, values, valuesReader);
+            }
+        }
+    }
+
+    private static class LongTimestampUpdater extends AbstractTimestampUpdater 
{
+
+        public LongTimestampUpdater(int precision) {
+            super(precision);
+        }
+
+        @Override
+        public void skipValues(int total, VectorizedValuesReader valuesReader) 
{
+            valuesReader.skipLongs(total);
+        }
+
+        @Override
+        public void readValue(
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
+            long value = valuesReader.readLong();
+            putTimestamp(values, offset, value);
+        }
+
+        @Override
+        public void decodeSingleDictionaryId(
+                int offset,
+                WritableColumnVector values,
+                WritableIntVector dictionaryIds,
+                Dictionary dictionary) {
+            long value = dictionary.decodeToLong(dictionaryIds.getInt(offset));
+            putTimestamp(values, offset, value);
+        }
+
+        private void putTimestamp(WritableColumnVector vector, int offset, 
long timestamp) {
+            if (vector instanceof WritableTimestampVector) {
+                ((WritableTimestampVector) vector)
+                        .setTimestamp(offset, 
Timestamp.fromEpochMillis(timestamp));
+            } else {
+                ((WritableLongVector) vector).setLong(offset, timestamp);
+            }
+        }
+    }
+
+    private static class TimestampUpdater extends AbstractTimestampUpdater {
 
         public static final int JULIAN_EPOCH_OFFSET_DAYS = 2_440_588;
         public static final long MILLIS_IN_DAY = TimeUnit.DAYS.toMillis(1);
         public static final long NANOS_PER_MILLISECOND = 
TimeUnit.MILLISECONDS.toNanos(1);
         public static final long NANOS_PER_SECOND = 
TimeUnit.SECONDS.toNanos(1);
 
+        public TimestampUpdater(int precision) {
+            super(precision);
+        }
+
         @Override
         public void readValues(
                 int total,
                 int offset,
-                WritableTimestampVector values,
+                WritableColumnVector values,
                 VectorizedValuesReader valuesReader) {
-
             for (int i = 0; i < total; i++) {
-                values.setTimestamp(
-                        offset + i,
-                        int96ToTimestamp(
-                                true, valuesReader.readLong(), 
valuesReader.readInteger()));
+                Timestamp timestamp =
+                        int96ToTimestamp(true, valuesReader.readLong(), 
valuesReader.readInteger());
+                putTimestamp(values, offset + i, timestamp);
             }
         }
 
@@ -420,8 +487,9 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableTimestampVector values, 
VectorizedValuesReader valuesReader) {
-            values.setTimestamp(
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
+            putTimestamp(
+                    values,
                     offset,
                     int96ToTimestamp(true, valuesReader.readLong(), 
valuesReader.readInteger()));
         }
@@ -429,11 +497,28 @@ public class ParquetVectorUpdaterFactory {
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableTimestampVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
-            values.setTimestamp(
-                    offset, decodeInt96ToTimestamp(true, dictionary, 
dictionaryIds.getInt(offset)));
+            putTimestamp(
+                    values,
+                    offset,
+                    decodeInt96ToTimestamp(true, dictionary, 
dictionaryIds.getInt(offset)));
+        }
+
+        private void putTimestamp(WritableColumnVector vector, int offset, 
Timestamp timestamp) {
+            if (vector instanceof WritableTimestampVector) {
+                ((WritableTimestampVector) vector).setTimestamp(offset, 
timestamp);
+            } else {
+                if (precision <= 3) {
+                    ((WritableLongVector) vector).setLong(offset, 
timestamp.getMillisecond());
+                } else if (precision <= 6) {
+                    ((WritableLongVector) vector).setLong(offset, 
timestamp.toMicros());
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported timestamp precision: " + precision);
+                }
+            }
         }
 
         public static Timestamp decodeInt96ToTimestamp(
@@ -615,9 +700,21 @@ public class ParquetVectorUpdaterFactory {
                 readValue(offset + i, values, valuesReader);
             }
         }
+
+        protected void putDecimal(WritableColumnVector values, int offset, 
BigDecimal decimal) {
+            int precision = paimonType.getPrecision();
+            if (ParquetSchemaConverter.is32BitDecimal(precision)) {
+                ((HeapIntVector) values).setInt(offset, 
decimal.unscaledValue().intValue());
+            } else if (ParquetSchemaConverter.is64BitDecimal(precision)) {
+                ((HeapLongVector) values).setLong(offset, 
decimal.unscaledValue().longValue());
+            } else {
+                byte[] bytes = decimal.unscaledValue().toByteArray();
+                ((WritableBytesVector) values).putByteArray(offset, bytes, 0, 
bytes.length);
+            }
+        }
     }
 
-    private static class IntegerToDecimalUpdater extends 
DecimalUpdater<WritableIntVector> {
+    private static class IntegerToDecimalUpdater extends 
DecimalUpdater<WritableColumnVector> {
         private final int parquetScale;
 
         IntegerToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
paimonType) {
@@ -638,25 +735,25 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableIntVector values, VectorizedValuesReader 
valuesReader) {
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
             BigDecimal decimal = 
BigDecimal.valueOf(valuesReader.readInteger(), parquetScale);
-            values.setInt(offset, decimal.unscaledValue().intValue());
+            putDecimal(values, offset, decimal);
         }
 
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableIntVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
             BigDecimal decimal =
                     BigDecimal.valueOf(
                             
dictionary.decodeToInt(dictionaryIds.getInt(offset)), parquetScale);
-            values.setInt(offset, decimal.unscaledValue().intValue());
+            putDecimal(values, offset, decimal);
         }
     }
 
-    private static class LongToDecimalUpdater extends 
DecimalUpdater<WritableLongVector> {
+    private static class LongToDecimalUpdater extends 
DecimalUpdater<WritableColumnVector> {
         private final int parquetScale;
 
         LongToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
paimonType) {
@@ -677,32 +774,34 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableLongVector values, VectorizedValuesReader 
valuesReader) {
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
             BigDecimal decimal = BigDecimal.valueOf(valuesReader.readLong(), 
parquetScale);
-            values.setLong(offset, decimal.unscaledValue().longValue());
+            putDecimal(values, offset, decimal);
         }
 
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableLongVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
             BigDecimal decimal =
                     BigDecimal.valueOf(
                             
dictionary.decodeToLong(dictionaryIds.getInt(offset)), parquetScale);
-            values.setLong(offset, decimal.unscaledValue().longValue());
+            putDecimal(values, offset, decimal);
         }
     }
 
-    private static class BinaryToDecimalUpdater extends 
DecimalUpdater<WritableBytesVector> {
+    private static class BinaryToDecimalUpdater extends 
DecimalUpdater<WritableColumnVector> {
         private final int parquetScale;
+        private final WritableBytesVector bytesVector;
 
         BinaryToDecimalUpdater(ColumnDescriptor descriptor, DecimalType 
paimonType) {
             super(paimonType);
             LogicalTypeAnnotation typeAnnotation =
                     descriptor.getPrimitiveType().getLogicalTypeAnnotation();
             this.parquetScale = ((DecimalLogicalTypeAnnotation) 
typeAnnotation).getScale();
+            this.bytesVector = new HeapBytesVector(1);
         }
 
         @Override
@@ -712,18 +811,17 @@ public class ParquetVectorUpdaterFactory {
 
         @Override
         public void readValue(
-                int offset, WritableBytesVector values, VectorizedValuesReader 
valuesReader) {
-            valuesReader.readBinary(1, values, offset);
-            BigInteger value = new 
BigInteger(values.getBytes(offset).getBytes());
+                int offset, WritableColumnVector values, 
VectorizedValuesReader valuesReader) {
+            valuesReader.readBinary(1, bytesVector, offset);
+            BigInteger value = new 
BigInteger(bytesVector.getBytes(offset).getBytes());
             BigDecimal decimal = new BigDecimal(value, parquetScale);
-            byte[] bytes = decimal.unscaledValue().toByteArray();
-            values.putByteArray(offset, bytes, 0, bytes.length);
+            putDecimal(values, offset, decimal);
         }
 
         @Override
         public void decodeSingleDictionaryId(
                 int offset,
-                WritableBytesVector values,
+                WritableColumnVector values,
                 WritableIntVector dictionaryIds,
                 Dictionary dictionary) {
             BigInteger value =
@@ -732,8 +830,7 @@ public class ParquetVectorUpdaterFactory {
                                     
.decodeToBinary(dictionaryIds.getInt(offset))
                                     .getBytesUnsafe());
             BigDecimal decimal = new BigDecimal(value, parquetScale);
-            byte[] bytes = decimal.unscaledValue().toByteArray();
-            values.putByteArray(offset, bytes, 0, bytes.length);
+            putDecimal(values, offset, decimal);
         }
     }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
index 166a5ce935..c12b58b4e7 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/newreader/VectorizedColumnReader.java
@@ -18,6 +18,13 @@
 
 package org.apache.paimon.format.parquet.newreader;
 
+import org.apache.paimon.data.columnar.BooleanColumnVector;
+import org.apache.paimon.data.columnar.BytesColumnVector;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.DoubleColumnVector;
+import org.apache.paimon.data.columnar.FloatColumnVector;
+import org.apache.paimon.data.columnar.IntColumnVector;
+import org.apache.paimon.data.columnar.LongColumnVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.data.columnar.writable.WritableIntVector;
 import org.apache.paimon.format.parquet.reader.ParquetDictionary;
@@ -43,6 +50,7 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
 
+import static org.apache.paimon.types.DataTypeRoot.FLOAT;
 import static 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BOOLEAN;
 
 /* This file is based on source code from the Spark Project 
(http://spark.apache.org/), licensed by the Apache
@@ -113,8 +121,28 @@ public class VectorizedColumnReader {
     }
 
     private boolean isLazyDecodingSupported(
-            PrimitiveType.PrimitiveTypeName typeName, DataType paimonType) {
-        return true;
+            PrimitiveType.PrimitiveTypeName typeName, ColumnVector 
columnVector) {
+        boolean isSupported = false;
+        switch (typeName) {
+            case INT32:
+                isSupported = columnVector instanceof IntColumnVector;
+                break;
+            case INT64:
+                isSupported = columnVector instanceof LongColumnVector;
+                break;
+            case FLOAT:
+                isSupported = columnVector instanceof FloatColumnVector;
+                break;
+            case DOUBLE:
+                isSupported = columnVector instanceof DoubleColumnVector;
+                break;
+            case BOOLEAN:
+                isSupported = columnVector instanceof BooleanColumnVector;
+                break;
+            case BINARY:
+                isSupported = columnVector instanceof BytesColumnVector;
+        }
+        return isSupported;
     }
 
     /** Reads `total` rows from this columnReader into column. */
@@ -181,7 +209,7 @@ public class VectorizedColumnReader {
                 // the values to add microseconds precision.
                 if (column.hasDictionary()
                         || (startRowId == pageFirstRowIndex
-                                && isLazyDecodingSupported(typeName, type))) {
+                                && isLazyDecodingSupported(typeName, column))) 
{
                     column.setDictionary(new ParquetDictionary(dictionary));
                 } else {
                     updater.decodeDictionaryIds(
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
index ece0ff9a7f..c8d149a8b6 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -61,7 +61,6 @@ import org.apache.parquet.io.PrimitiveColumnIO;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
-import 
org.apache.parquet.schema.LogicalTypeAnnotation.DecimalLogicalTypeAnnotation;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
@@ -225,10 +224,6 @@ public class ParquetSplitReaderUtil {
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
                 int precision = DataTypeChecks.getPrecision(fieldType);
                 if (precision > 6) {
-                    checkArgument(
-                            typeName == PrimitiveType.PrimitiveTypeName.INT96,
-                            "Unexpected type: %s",
-                            typeName);
                     return new HeapTimestampVector(batchSize);
                 } else {
                     return new HeapLongVector(batchSize);
@@ -236,31 +231,10 @@ public class ParquetSplitReaderUtil {
             case DECIMAL:
                 DecimalType decimalType = (DecimalType) fieldType;
                 if 
(ParquetSchemaConverter.is32BitDecimal(decimalType.getPrecision())) {
-                    checkArgument(
-                            (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-                                            || typeName == 
PrimitiveType.PrimitiveTypeName.INT32)
-                                    && primitiveType.getLogicalTypeAnnotation()
-                                            instanceof 
DecimalLogicalTypeAnnotation,
-                            "Unexpected type: %s",
-                            typeName);
                     return new HeapIntVector(batchSize);
                 } else if 
(ParquetSchemaConverter.is64BitDecimal(decimalType.getPrecision())) {
-                    checkArgument(
-                            (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-                                            || typeName == 
PrimitiveType.PrimitiveTypeName.INT64)
-                                    && primitiveType.getLogicalTypeAnnotation()
-                                            instanceof 
DecimalLogicalTypeAnnotation,
-                            "Unexpected type: %s",
-                            typeName);
                     return new HeapLongVector(batchSize);
                 } else {
-                    checkArgument(
-                            (typeName == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-                                            || typeName == 
PrimitiveType.PrimitiveTypeName.BINARY)
-                                    && primitiveType.getLogicalTypeAnnotation()
-                                            instanceof 
DecimalLogicalTypeAnnotation,
-                            "Unexpected type: %s",
-                            typeName);
                     return new HeapBytesVector(batchSize);
                 }
             case ARRAY:

Reply via email to