This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c2a2fcdfe [parquet] Fix parquet timestamp read (#3662)
c2a2fcdfe is described below

commit c2a2fcdfe1faa1ccf4b63954a453e05e62f7e562
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 3 12:09:59 2024 +0800

    [parquet] Fix parquet timestamp read (#3662)
---
 .../format/parquet/reader/ArrayColumnReader.java   | 518 ---------------------
 .../parquet/reader/BaseVectorizedColumnReader.java | 291 ------------
 .../format/parquet/reader/MapColumnReader.java     |  65 ---
 .../reader/NestedPrimitiveColumnReader.java        |  36 +-
 .../parquet/reader/ParquetDataColumnReader.java    |  17 +-
 .../reader/ParquetDataColumnReaderFactory.java     |  92 ++--
 .../format/parquet/ParquetReadWriteTest.java       |  24 +-
 7 files changed, 97 insertions(+), 946 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
deleted file mode 100644
index 3a602783a..000000000
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ArrayColumnReader.java
+++ /dev/null
@@ -1,518 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.parquet.reader;
-
-import org.apache.paimon.data.Timestamp;
-import org.apache.paimon.data.columnar.VectorizedColumnBatch;
-import org.apache.paimon.data.columnar.heap.HeapArrayVector;
-import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
-import org.apache.paimon.data.columnar.heap.HeapByteVector;
-import org.apache.paimon.data.columnar.heap.HeapBytesVector;
-import org.apache.paimon.data.columnar.heap.HeapDoubleVector;
-import org.apache.paimon.data.columnar.heap.HeapFloatVector;
-import org.apache.paimon.data.columnar.heap.HeapIntVector;
-import org.apache.paimon.data.columnar.heap.HeapLongVector;
-import org.apache.paimon.data.columnar.heap.HeapShortVector;
-import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
-import org.apache.paimon.data.columnar.writable.WritableColumnVector;
-import org.apache.paimon.types.ArrayType;
-import org.apache.paimon.types.DataType;
-import org.apache.paimon.types.LocalZonedTimestampType;
-import org.apache.paimon.types.TimestampType;
-
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.paimon.types.DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE;
-
-/** Array {@link ColumnReader}. TODO Currently ARRAY type only support non 
nested case. */
-public class ArrayColumnReader extends BaseVectorizedColumnReader {
-
-    // The value read in last time
-    private Object lastValue;
-
-    // flag to indicate if there is no data in parquet data page
-    private boolean eof = false;
-
-    // flag to indicate if it's the first time to read parquet data page with 
this instance
-    boolean isFirstRow = true;
-
-    public ArrayColumnReader(
-            ColumnDescriptor descriptor,
-            PageReader pageReader,
-            boolean isUtcTimestamp,
-            Type type,
-            DataType dataType)
-            throws IOException {
-        super(descriptor, pageReader, isUtcTimestamp, type, dataType);
-    }
-
-    @Override
-    public void readToVector(int readNumber, WritableColumnVector vector) {
-        HeapArrayVector lcv = (HeapArrayVector) vector;
-        // before readBatch, initial the size of offsets & lengths as the 
default value,
-        // the actual size will be assigned in setChildrenInfo() after reading 
complete.
-        lcv.setOffsets(new long[VectorizedColumnBatch.DEFAULT_SIZE]);
-        lcv.setLengths(new long[VectorizedColumnBatch.DEFAULT_SIZE]);
-
-        DataType elementType = ((ArrayType) dataType).getElementType();
-
-        // read the first row in parquet data page, this will be only happened 
once for this
-        // instance
-        if (isFirstRow) {
-            if (!fetchNextValue(elementType)) {
-                return;
-            }
-            isFirstRow = false;
-        }
-
-        // Because the length of ListColumnVector.child can't be known now,
-        // the valueList will save all data for ListColumnVector temporary.
-        List<Object> valueList = new ArrayList<>();
-
-        int index = collectDataFromParquetPage(readNumber, lcv, valueList, 
elementType);
-        // Convert valueList to array for the ListColumnVector.child
-        fillColumnVector(elementType, lcv, valueList, index);
-    }
-
-    /**
-     * Reads a single value from parquet page, puts it into lastValue. Returns 
a boolean indicating
-     * if there is more values to read (true).
-     *
-     * @param type the element type of array
-     * @return boolean
-     */
-    private boolean fetchNextValue(DataType type) {
-        int left = readPageIfNeed();
-        if (left > 0) {
-            // get the values of repetition and definitionLevel
-            readRepetitionAndDefinitionLevels();
-            // read the data if it isn't null
-            if (definitionLevel == maxDefLevel) {
-                if (isCurrentPageDictionaryEncoded) {
-                    lastValue = dataColumn.readValueDictionaryId();
-                } else {
-                    lastValue = readPrimitiveTypedRow(type);
-                }
-            } else {
-                lastValue = null;
-            }
-            return true;
-        } else {
-            eof = true;
-            return false;
-        }
-    }
-
-    private int readPageIfNeed() {
-        // Compute the number of values we want to read in this page.
-        int leftInPage = (int) (endOfPageValueCount - valuesRead);
-        if (leftInPage == 0) {
-            // no data left in current page, load data from new page
-            readPage();
-            leftInPage = (int) (endOfPageValueCount - valuesRead);
-        }
-        return leftInPage;
-    }
-
-    // Need to be in consistent with that 
VectorizedPrimitiveColumnReader#readBatchHelper
-    // TODO Reduce the duplicated code
-    private Object readPrimitiveTypedRow(DataType type) {
-        switch (type.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-            case BINARY:
-            case VARBINARY:
-                return dataColumn.readBytes();
-            case BOOLEAN:
-                return dataColumn.readBoolean();
-            case TIME_WITHOUT_TIME_ZONE:
-            case DATE:
-            case INTEGER:
-                return dataColumn.readInteger();
-            case TINYINT:
-                return dataColumn.readTinyInt();
-            case SMALLINT:
-                return dataColumn.readSmallInt();
-            case BIGINT:
-                return dataColumn.readLong();
-            case FLOAT:
-                return dataColumn.readFloat();
-            case DOUBLE:
-                return dataColumn.readDouble();
-            case DECIMAL:
-                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
-                    case INT32:
-                        return dataColumn.readInteger();
-                    case INT64:
-                        return dataColumn.readLong();
-                    case BINARY:
-                    case FIXED_LEN_BYTE_ARRAY:
-                        return dataColumn.readBytes();
-                }
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                int precision;
-                if (type.getTypeRoot() == TIMESTAMP_WITHOUT_TIME_ZONE) {
-                    precision = ((TimestampType) type).getPrecision();
-                } else {
-                    precision = ((LocalZonedTimestampType) 
type).getPrecision();
-                }
-
-                if (precision <= 3) {
-                    return dataColumn.readMillsTimestamp();
-                } else if (precision <= 6) {
-                    return dataColumn.readMicrosTimestamp();
-                } else {
-                    throw new RuntimeException(
-                            "Unsupported precision of time type in the list: " 
+ precision);
-                }
-            default:
-                throw new RuntimeException("Unsupported type in the list: " + 
type);
-        }
-    }
-
-    private Object dictionaryDecodeValue(DataType type, Integer 
dictionaryValue) {
-        if (dictionaryValue == null) {
-            return null;
-        }
-
-        switch (type.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-            case BINARY:
-            case VARBINARY:
-                return dictionary.readBytes(dictionaryValue);
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-            case INTEGER:
-                return dictionary.readInteger(dictionaryValue);
-            case BOOLEAN:
-                return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
-            case DOUBLE:
-                return dictionary.readDouble(dictionaryValue);
-            case FLOAT:
-                return dictionary.readFloat(dictionaryValue);
-            case TINYINT:
-                return dictionary.readTinyInt(dictionaryValue);
-            case SMALLINT:
-                return dictionary.readSmallInt(dictionaryValue);
-            case BIGINT:
-                return dictionary.readLong(dictionaryValue);
-            case DECIMAL:
-                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
-                    case INT32:
-                        return dictionary.readInteger(dictionaryValue);
-                    case INT64:
-                        return dictionary.readLong(dictionaryValue);
-                    case FIXED_LEN_BYTE_ARRAY:
-                    case BINARY:
-                        return dictionary.readBytes(dictionaryValue);
-                }
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return dictionary.readTimestamp(dictionaryValue);
-            default:
-                throw new RuntimeException("Unsupported type in the list: " + 
type);
-        }
-    }
-
-    /**
-     * Collects data from a parquet page and returns the final row index where 
it stopped. The
-     * returned index can be equal to or less than total.
-     *
-     * @param total maximum number of rows to collect
-     * @param lcv column vector to do initial setup in data collection time
-     * @param valueList collection of values that will be fed into the vector 
later
-     * @param type the element type of array
-     * @return int
-     */
-    private int collectDataFromParquetPage(
-            int total, HeapArrayVector lcv, List<Object> valueList, DataType 
type) {
-        int index = 0;
-        /*
-         * Here is a nested loop for collecting all values from a parquet page.
-         * A column of array type can be considered as a list of lists, so the 
two loops are as below:
-         * 1. The outer loop iterates on rows (index is a row index, so points 
to a row in the batch), e.g.:
-         * [0, 2, 3]    <- index: 0
-         * [NULL, 3, 4] <- index: 1
-         *
-         * 2. The inner loop iterates on values within a row (sets all data 
from parquet data page
-         * for an element in ListColumnVector), so fetchNextValue returns 
values one-by-one:
-         * 0, 2, 3, NULL, 3, 4
-         *
-         * As described below, the repetition level (repetitionLevel != 0)
-         * can be used to decide when we'll start to read values for the next 
list.
-         */
-        while (!eof && index < total) {
-            // add element to ListColumnVector one by one
-            lcv.getOffsets()[index] = valueList.size();
-            /*
-             * Let's collect all values for a single list.
-             * Repetition level = 0 means that a new list started there in the 
parquet page,
-             * in that case, let's exit from the loop, and start to collect 
value for a new list.
-             */
-            do {
-                /*
-                 * Definition level = 0 when a NULL value was returned instead 
of a list
-                 * (this is not the same as a NULL value in of a list).
-                 */
-                if (definitionLevel == 0) {
-                    lcv.setNullAt(index);
-                }
-                valueList.add(
-                        isCurrentPageDictionaryEncoded
-                                ? dictionaryDecodeValue(type, (Integer) 
lastValue)
-                                : lastValue);
-            } while (fetchNextValue(type) && (repetitionLevel != 0));
-
-            lcv.getLengths()[index] = valueList.size() - 
lcv.getOffsets()[index];
-            index++;
-        }
-        return index;
-    }
-
-    /**
-     * The lengths & offsets will be initialized as default size (1024), it 
should be set to the
-     * actual size according to the element number.
-     */
-    private void setChildrenInfo(HeapArrayVector lcv, int itemNum, int 
elementNum) {
-        lcv.setSize(itemNum);
-        long[] lcvLength = new long[elementNum];
-        long[] lcvOffset = new long[elementNum];
-        System.arraycopy(lcv.getLengths(), 0, lcvLength, 0, elementNum);
-        System.arraycopy(lcv.getOffsets(), 0, lcvOffset, 0, elementNum);
-        lcv.setLengths(lcvLength);
-        lcv.setOffsets(lcvOffset);
-    }
-
-    private void fillColumnVector(
-            DataType type, HeapArrayVector lcv, List valueList, int 
elementNum) {
-        int total = valueList.size();
-        setChildrenInfo(lcv, total, elementNum);
-        switch (type.getTypeRoot()) {
-            case CHAR:
-            case VARCHAR:
-            case BINARY:
-            case VARBINARY:
-                HeapBytesVector bytesVector = new HeapBytesVector(total);
-                bytesVector.reset();
-                lcv.setChild(bytesVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    byte[] src = (byte[]) valueList.get(i);
-                    if (src == null) {
-                        ((HeapBytesVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapBytesVector) lcv.getChild()).appendBytes(i, src, 
0, src.length);
-                    }
-                }
-                break;
-            case BOOLEAN:
-                HeapBooleanVector booleanVector = new HeapBooleanVector(total);
-                booleanVector.reset();
-                lcv.setChild(booleanVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    if (valueList.get(i) == null) {
-                        ((HeapBooleanVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapBooleanVector) lcv.getChild()).vector[i] = 
(boolean) valueList.get(i);
-                    }
-                }
-                break;
-            case TINYINT:
-                HeapByteVector byteVector = new HeapByteVector(total);
-                byteVector.reset();
-                lcv.setChild(byteVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    if (valueList.get(i) == null) {
-                        ((HeapByteVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapByteVector) lcv.getChild()).vector[i] =
-                                ((List<Integer>) valueList).get(i).byteValue();
-                    }
-                }
-                break;
-            case SMALLINT:
-                HeapShortVector shortVector = new HeapShortVector(total);
-                shortVector.reset();
-                lcv.setChild(shortVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    if (valueList.get(i) == null) {
-                        ((HeapShortVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapShortVector) lcv.getChild()).vector[i] =
-                                ((List<Integer>) 
valueList).get(i).shortValue();
-                    }
-                }
-                break;
-            case INTEGER:
-            case DATE:
-            case TIME_WITHOUT_TIME_ZONE:
-                HeapIntVector intVector = new HeapIntVector(total);
-                intVector.reset();
-                lcv.setChild(intVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    if (valueList.get(i) == null) {
-                        ((HeapIntVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapIntVector) lcv.getChild()).vector[i] =
-                                ((List<Integer>) valueList).get(i);
-                    }
-                }
-                break;
-            case FLOAT:
-                HeapFloatVector floatVector = new HeapFloatVector(total);
-                floatVector.reset();
-                lcv.setChild(floatVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    if (valueList.get(i) == null) {
-                        ((HeapFloatVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapFloatVector) lcv.getChild()).vector[i] =
-                                ((List<Float>) valueList).get(i);
-                    }
-                }
-                break;
-            case BIGINT:
-                HeapLongVector longVector = new HeapLongVector(total);
-                longVector.reset();
-                lcv.setChild(longVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    if (valueList.get(i) == null) {
-                        ((HeapLongVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapLongVector) lcv.getChild()).vector[i] =
-                                ((List<Long>) valueList).get(i);
-                    }
-                }
-                break;
-            case DOUBLE:
-                HeapDoubleVector doubleVector = new HeapDoubleVector(total);
-                doubleVector.reset();
-                lcv.setChild(doubleVector);
-                for (int i = 0; i < valueList.size(); i++) {
-                    if (valueList.get(i) == null) {
-                        ((HeapDoubleVector) lcv.getChild()).setNullAt(i);
-                    } else {
-                        ((HeapDoubleVector) lcv.getChild()).vector[i] =
-                                ((List<Double>) valueList).get(i);
-                    }
-                }
-                break;
-            case TIMESTAMP_WITHOUT_TIME_ZONE:
-            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                if (descriptor.getPrimitiveType().getPrimitiveTypeName()
-                        == PrimitiveType.PrimitiveTypeName.INT64) {
-                    HeapTimestampVector heapTimestampVector = new 
HeapTimestampVector(total);
-                    heapTimestampVector.reset();
-                    lcv.setChild(new 
ParquetTimestampVector(heapTimestampVector));
-                    for (int i = 0; i < valueList.size(); i++) {
-                        if (valueList.get(i) == null) {
-                            ((HeapTimestampVector)
-                                            ((ParquetTimestampVector) 
lcv.getChild()).getVector())
-                                    .setNullAt(i);
-                        } else {
-                            ((HeapTimestampVector)
-                                            ((ParquetTimestampVector) 
lcv.getChild()).getVector())
-                                    .fill(((List<Timestamp>) 
valueList).get(i));
-                        }
-                    }
-                    break;
-                } else {
-                    HeapTimestampVector timestampVector = new 
HeapTimestampVector(total);
-                    timestampVector.reset();
-                    lcv.setChild(timestampVector);
-                    for (int i = 0; i < valueList.size(); i++) {
-                        if (valueList.get(i) == null) {
-                            ((HeapTimestampVector) 
lcv.getChild()).setNullAt(i);
-                        } else {
-                            ((HeapTimestampVector) lcv.getChild())
-                                    .setTimestamp(i, ((List<Timestamp>) 
valueList).get(i));
-                        }
-                    }
-                    break;
-                }
-            case DECIMAL:
-                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
-                    case INT32:
-                        HeapIntVector heapIntVector = new HeapIntVector(total);
-                        heapIntVector.reset();
-                        lcv.setChild(new ParquetDecimalVector(heapIntVector));
-                        for (int i = 0; i < valueList.size(); i++) {
-                            if (valueList.get(i) == null) {
-                                ((HeapIntVector)
-                                                ((ParquetDecimalVector) 
lcv.getChild()).getVector())
-                                        .setNullAt(i);
-                            } else {
-                                ((HeapIntVector)
-                                                        
((ParquetDecimalVector) lcv.getChild())
-                                                                .getVector())
-                                                .vector[i] =
-                                        ((List<Integer>) valueList).get(i);
-                            }
-                        }
-                        break;
-                    case INT64:
-                        HeapLongVector heapLongVector = new 
HeapLongVector(total);
-                        heapLongVector.reset();
-                        lcv.setChild(new ParquetDecimalVector(heapLongVector));
-                        for (int i = 0; i < valueList.size(); i++) {
-                            if (valueList.get(i) == null) {
-                                ((HeapLongVector)
-                                                ((ParquetDecimalVector) 
lcv.getChild()).getVector())
-                                        .setNullAt(i);
-                            } else {
-                                ((HeapLongVector)
-                                                        
((ParquetDecimalVector) lcv.getChild())
-                                                                .getVector())
-                                                .vector[i] =
-                                        ((List<Long>) valueList).get(i);
-                            }
-                        }
-                        break;
-                    default:
-                        HeapBytesVector heapBytesVector = new 
HeapBytesVector(total);
-                        heapBytesVector.reset();
-                        lcv.setChild(new 
ParquetDecimalVector(heapBytesVector));
-                        for (int i = 0; i < valueList.size(); i++) {
-                            byte[] src = (byte[]) valueList.get(i);
-                            if (valueList.get(i) == null) {
-                                ((HeapBytesVector)
-                                                ((ParquetDecimalVector) 
lcv.getChild()).getVector())
-                                        .setNullAt(i);
-                            } else {
-                                ((HeapBytesVector)
-                                                ((ParquetDecimalVector) 
lcv.getChild()).getVector())
-                                        .appendBytes(i, src, 0, src.length);
-                            }
-                        }
-                        break;
-                }
-                break;
-            default:
-                throw new RuntimeException("Unsupported type in the list: " + 
type);
-        }
-    }
-}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BaseVectorizedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BaseVectorizedColumnReader.java
deleted file mode 100644
index 0fb4c143c..000000000
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/BaseVectorizedColumnReader.java
+++ /dev/null
@@ -1,291 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.parquet.reader;
-
-import org.apache.paimon.data.columnar.writable.WritableColumnVector;
-import org.apache.paimon.types.DataType;
-
-import org.apache.parquet.bytes.ByteBufferInputStream;
-import org.apache.parquet.bytes.BytesInput;
-import org.apache.parquet.bytes.BytesUtils;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.Encoding;
-import org.apache.parquet.column.page.DataPage;
-import org.apache.parquet.column.page.DataPageV1;
-import org.apache.parquet.column.page.DataPageV2;
-import org.apache.parquet.column.page.DictionaryPage;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.column.values.ValuesReader;
-import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
-import org.apache.parquet.io.ParquetDecodingException;
-import org.apache.parquet.schema.Type;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.ByteArrayInputStream;
-import java.io.IOException;
-
-import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
-import static org.apache.parquet.column.ValuesType.VALUES;
-
-/**
- * It's column level Parquet reader which is used to read a batch of records 
for a column, part of
- * the code is referred from Apache Hive and Apache Parquet.
- */
-public abstract class BaseVectorizedColumnReader implements 
ColumnReader<WritableColumnVector> {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(BaseVectorizedColumnReader.class);
-
-    protected boolean isUtcTimestamp;
-
-    /** Total number of values read. */
-    protected long valuesRead;
-
-    /**
-     * value that indicates the end of the current page. That is, if 
valuesRead ==
-     * endOfPageValueCount, we are at the end of the page.
-     */
-    protected long endOfPageValueCount;
-
-    /** The dictionary, if this column has dictionary encoding. */
-    protected final ParquetDataColumnReader dictionary;
-
-    /** If true, the current page is dictionary encoded. */
-    protected boolean isCurrentPageDictionaryEncoded;
-
-    /** Maximum definition level for this column. */
-    protected final int maxDefLevel;
-
-    protected int definitionLevel;
-    protected int repetitionLevel;
-
-    /** Repetition/Definition/Value readers. */
-    protected IntIterator repetitionLevelColumn;
-
-    protected IntIterator definitionLevelColumn;
-    protected ParquetDataColumnReader dataColumn;
-
-    /** Total values in the current page. */
-    protected int pageValueCount;
-
-    protected final PageReader pageReader;
-    protected final ColumnDescriptor descriptor;
-    protected final Type type;
-    protected final DataType dataType;
-
-    public BaseVectorizedColumnReader(
-            ColumnDescriptor descriptor,
-            PageReader pageReader,
-            boolean isUtcTimestamp,
-            Type parquetType,
-            DataType dataType)
-            throws IOException {
-        this.descriptor = descriptor;
-        this.type = parquetType;
-        this.pageReader = pageReader;
-        this.maxDefLevel = descriptor.getMaxDefinitionLevel();
-        this.isUtcTimestamp = isUtcTimestamp;
-        this.dataType = dataType;
-
-        DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
-        if (dictionaryPage != null) {
-            try {
-                this.dictionary =
-                        
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
-                                parquetType.asPrimitiveType(),
-                                dictionaryPage
-                                        .getEncoding()
-                                        .initDictionary(descriptor, 
dictionaryPage),
-                                isUtcTimestamp);
-                this.isCurrentPageDictionaryEncoded = true;
-            } catch (IOException e) {
-                throw new IOException(
-                        String.format("Could not decode the dictionary for 
%s", descriptor), e);
-            }
-        } else {
-            this.dictionary = null;
-            this.isCurrentPageDictionaryEncoded = false;
-        }
-    }
-
-    protected void readRepetitionAndDefinitionLevels() {
-        repetitionLevel = repetitionLevelColumn.nextInt();
-        definitionLevel = definitionLevelColumn.nextInt();
-        valuesRead++;
-    }
-
-    protected void readPage() {
-        DataPage page = pageReader.readPage();
-
-        if (page == null) {
-            return;
-        }
-
-        page.accept(
-                new DataPage.Visitor<Void>() {
-                    @Override
-                    public Void visit(DataPageV1 dataPageV1) {
-                        readPageV1(dataPageV1);
-                        return null;
-                    }
-
-                    @Override
-                    public Void visit(DataPageV2 dataPageV2) {
-                        readPageV2(dataPageV2);
-                        return null;
-                    }
-                });
-    }
-
-    private void initDataReader(Encoding dataEncoding, ByteBufferInputStream 
in, int valueCount)
-            throws IOException {
-        this.pageValueCount = valueCount;
-        this.endOfPageValueCount = valuesRead + pageValueCount;
-        if (dataEncoding.usesDictionary()) {
-            this.dataColumn = null;
-            if (dictionary == null) {
-                throw new IOException(
-                        String.format(
-                                "Could not read page in col %s because the 
dictionary was missing for encoding %s.",
-                                descriptor, dataEncoding));
-            }
-            dataColumn =
-                    ParquetDataColumnReaderFactory.getDataColumnReaderByType(
-                            type.asPrimitiveType(),
-                            dataEncoding.getDictionaryBasedValuesReader(
-                                    descriptor, VALUES, 
dictionary.getDictionary()),
-                            isUtcTimestamp);
-            this.isCurrentPageDictionaryEncoded = true;
-        } else {
-            dataColumn =
-                    ParquetDataColumnReaderFactory.getDataColumnReaderByType(
-                            type.asPrimitiveType(),
-                            dataEncoding.getValuesReader(descriptor, VALUES),
-                            isUtcTimestamp);
-            this.isCurrentPageDictionaryEncoded = false;
-        }
-
-        try {
-            dataColumn.initFromPage(pageValueCount, in);
-        } catch (IOException e) {
-            throw new IOException(String.format("Could not read page in col 
%s.", descriptor), e);
-        }
-    }
-
-    private void readPageV1(DataPageV1 page) {
-        ValuesReader rlReader = 
page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
-        ValuesReader dlReader = 
page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
-        this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
-        this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
-        try {
-            BytesInput bytes = page.getBytes();
-            LOG.debug("Page size {}  bytes and {} records.", bytes.size(), 
pageValueCount);
-            ByteBufferInputStream in = bytes.toInputStream();
-            LOG.debug("Reading repetition levels at {}.", in.position());
-            rlReader.initFromPage(pageValueCount, in);
-            LOG.debug("Reading definition levels at {}.", in.position());
-            dlReader.initFromPage(pageValueCount, in);
-            LOG.debug("Reading data at {}.", in.position());
-            initDataReader(page.getValueEncoding(), in, page.getValueCount());
-        } catch (IOException e) {
-            throw new ParquetDecodingException(
-                    String.format("Could not read page %s in col %s.", page, 
descriptor), e);
-        }
-    }
-
-    private void readPageV2(DataPageV2 page) {
-        this.pageValueCount = page.getValueCount();
-        this.repetitionLevelColumn =
-                newRLEIterator(descriptor.getMaxRepetitionLevel(), 
page.getRepetitionLevels());
-        this.definitionLevelColumn =
-                newRLEIterator(descriptor.getMaxDefinitionLevel(), 
page.getDefinitionLevels());
-        try {
-            LOG.debug(
-                    "Page data size {} bytes and {} records.",
-                    page.getData().size(),
-                    pageValueCount);
-            initDataReader(
-                    page.getDataEncoding(), page.getData().toInputStream(), 
page.getValueCount());
-        } catch (IOException e) {
-            throw new ParquetDecodingException(
-                    String.format("Could not read page %s in col %s.", page, 
descriptor), e);
-        }
-    }
-
-    private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
-        try {
-            if (maxLevel == 0) {
-                return new NullIntIterator();
-            }
-            return new RLEIntIterator(
-                    new RunLengthBitPackingHybridDecoder(
-                            BytesUtils.getWidthFromMaxInt(maxLevel),
-                            new ByteArrayInputStream(bytes.toByteArray())));
-        } catch (IOException e) {
-            throw new ParquetDecodingException(
-                    String.format("Could not read levels in page for col %s.", 
descriptor), e);
-        }
-    }
-
-    /** Utility interface to abstract over different way to read ints with 
different encodings. */
-    interface IntIterator {
-        int nextInt();
-    }
-
-    /** Reading int from {@link ValuesReader}. */
-    protected static final class ValuesReaderIntIterator implements 
IntIterator {
-        ValuesReader delegate;
-
-        public ValuesReaderIntIterator(ValuesReader delegate) {
-            this.delegate = delegate;
-        }
-
-        @Override
-        public int nextInt() {
-            return delegate.readInteger();
-        }
-    }
-
-    /** Reading int from {@link RunLengthBitPackingHybridDecoder}. */
-    protected static final class RLEIntIterator implements IntIterator {
-        RunLengthBitPackingHybridDecoder delegate;
-
-        public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
-            this.delegate = delegate;
-        }
-
-        @Override
-        public int nextInt() {
-            try {
-                return delegate.readInt();
-            } catch (IOException e) {
-                throw new ParquetDecodingException(e);
-            }
-        }
-    }
-
-    /** Reading zero always. */
-    protected static final class NullIntIterator implements IntIterator {
-        @Override
-        public int nextInt() {
-            return 0;
-        }
-    }
-}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/MapColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/MapColumnReader.java
deleted file mode 100644
index c865617f5..000000000
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/MapColumnReader.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.format.parquet.reader;
-
-import org.apache.paimon.data.columnar.ColumnVector;
-import org.apache.paimon.data.columnar.heap.HeapArrayVector;
-import org.apache.paimon.data.columnar.heap.HeapMapVector;
-import org.apache.paimon.data.columnar.writable.WritableColumnVector;
-
-import java.io.IOException;
-
-/** Map {@link ColumnReader}. */
-public class MapColumnReader implements ColumnReader<WritableColumnVector> {
-
-    private final ArrayColumnReader keyReader;
-    private final ArrayColumnReader valueReader;
-
-    public MapColumnReader(ArrayColumnReader keyReader, ArrayColumnReader 
valueReader) {
-        this.keyReader = keyReader;
-        this.valueReader = valueReader;
-    }
-
-    public void readBatch(int total, ColumnVector column) throws IOException {
-        HeapMapVector mapVector = (HeapMapVector) column;
-        // initialize 2 ListColumnVector for keys and values
-        HeapArrayVector keyArrayColumnVector = new HeapArrayVector(total);
-        HeapArrayVector valueArrayColumnVector = new HeapArrayVector(total);
-        // read the keys and values
-        keyReader.readToVector(total, keyArrayColumnVector);
-        valueReader.readToVector(total, valueArrayColumnVector);
-
-        // set the related attributes according to the keys and values
-        mapVector.setKeys(keyArrayColumnVector.getChild());
-        mapVector.setValues(valueArrayColumnVector.getChild());
-        mapVector.setOffsets(keyArrayColumnVector.getOffsets());
-        mapVector.setLengths(keyArrayColumnVector.getLengths());
-        mapVector.setSize(keyArrayColumnVector.getSize());
-        for (int i = 0; i < keyArrayColumnVector.getLen(); i++) {
-            if (keyArrayColumnVector.isNullAt(i)) {
-                mapVector.setNullAt(i);
-            }
-        }
-    }
-
-    @Override
-    public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
-        readBatch(readNumber, vector);
-    }
-}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index dbbf2028c..0b0d89d4d 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -31,6 +31,8 @@ import 
org.apache.paimon.data.columnar.heap.HeapTimestampVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.format.parquet.position.LevelDelegation;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.IntArrayList;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
@@ -128,7 +130,6 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             try {
                 this.dictionary =
                         
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
-                                parquetType.asPrimitiveType(),
                                 dictionaryPage
                                         .getEncoding()
                                         .initDictionary(descriptor, 
dictionaryPage),
@@ -263,13 +264,24 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
                         return dataColumn.readBytes();
                 }
             case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return readTimestamp(((TimestampType) 
category).getPrecision());
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return dataColumn.readMillsTimestamp();
+                return readTimestamp(((LocalZonedTimestampType) 
category).getPrecision());
             default:
                 throw new RuntimeException("Unsupported type in the list: " + 
type);
         }
     }
 
+    private Timestamp readTimestamp(int precision) {
+        if (precision <= 3) {
+            return dataColumn.readMillsTimestamp();
+        } else if (precision <= 6) {
+            return dataColumn.readMicrosTimestamp();
+        } else {
+            return dataColumn.readNanosTimestamp();
+        }
+    }
+
     private Object dictionaryDecodeValue(DataType category, Integer 
dictionaryValue) {
         if (dictionaryValue == null) {
             return null;
@@ -308,13 +320,26 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
                         return dictionary.readBytes(dictionaryValue);
                 }
             case TIMESTAMP_WITHOUT_TIME_ZONE:
+                return dictionaryReadTimestamp(
+                        ((TimestampType) category).getPrecision(), 
dictionaryValue);
             case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-                return dictionary.readTimestamp(dictionaryValue);
+                return dictionaryReadTimestamp(
+                        ((LocalZonedTimestampType) category).getPrecision(), 
dictionaryValue);
             default:
                 throw new RuntimeException("Unsupported type in the list: " + 
type);
         }
     }
 
+    private Timestamp dictionaryReadTimestamp(int precision, int 
dictionaryValue) {
+        if (precision <= 3) {
+            return dictionary.readMillsTimestamp(dictionaryValue);
+        } else if (precision <= 6) {
+            return dictionary.readMicrosTimestamp(dictionaryValue);
+        } else {
+            return dictionary.readNanosTimestamp(dictionaryValue);
+        }
+    }
+
     private WritableColumnVector fillColumnVector(int total, List valueList) {
         switch (dataType.getTypeRoot()) {
             case CHAR:
@@ -499,7 +524,6 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             }
             dataColumn =
                     ParquetDataColumnReaderFactory.getDataColumnReaderByType(
-                            type.asPrimitiveType(),
                             dataEncoding.getDictionaryBasedValuesReader(
                                     descriptor, VALUES, 
dictionary.getDictionary()),
                             isUtcTimestamp);
@@ -507,9 +531,7 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         } else {
             dataColumn =
                     ParquetDataColumnReaderFactory.getDataColumnReaderByType(
-                            type.asPrimitiveType(),
-                            dataEncoding.getValuesReader(descriptor, VALUES),
-                            isUtcTimestamp);
+                            dataEncoding.getValuesReader(descriptor, VALUES), 
isUtcTimestamp);
             this.isCurrentPageDictionaryEncoded = false;
         }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
index 0d3c220a1..53ade64ab 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReader.java
@@ -72,6 +72,9 @@ public interface ParquetDataColumnReader {
     /** @return the next Timestamp with microsecond precision. */
     Timestamp readMicrosTimestamp();
 
+    /** @return the next Timestamp with nanos precision. */
+    Timestamp readNanosTimestamp();
+
     /** @return the underlying dictionary if current reader is dictionary 
encoded */
     Dictionary getDictionary();
 
@@ -127,5 +130,17 @@ public interface ParquetDataColumnReader {
      * @param id in dictionary
      * @return the TimestampData from the dictionary by id
      */
-    Timestamp readTimestamp(int id);
+    Timestamp readMillsTimestamp(int id);
+
+    /**
+     * @param id in dictionary
+     * @return the TimestampData from the dictionary by id
+     */
+    Timestamp readMicrosTimestamp(int id);
+
+    /**
+     * @param id in dictionary
+     * @return the TimestampData from the dictionary by id
+     */
+    Timestamp readNanosTimestamp(int id);
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
index 382704f1c..c3e05e0ae 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDataColumnReaderFactory.java
@@ -24,7 +24,6 @@ import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
-import org.apache.parquet.schema.PrimitiveType;
 
 import javax.annotation.Nullable;
 
@@ -47,18 +46,22 @@ public final class ParquetDataColumnReaderFactory {
      * The default data column reader for existing Parquet page reader which 
works for both
      * dictionary or non dictionary types, Mirror from dictionary encoding 
path.
      */
-    public static class DefaultParquetDataColumnReader implements 
ParquetDataColumnReader {
-        protected final ValuesReader valuesReader;
-        protected final Dictionary dict;
+    private static class DefaultParquetDataColumnReader implements 
ParquetDataColumnReader {
 
-        public DefaultParquetDataColumnReader(ValuesReader valuesReader) {
+        private final ValuesReader valuesReader;
+        private final Dictionary dict;
+        private final boolean isUtcTimestamp;
+
+        public DefaultParquetDataColumnReader(ValuesReader valuesReader, 
boolean isUtcTimestamp) {
             this.valuesReader = checkNotNull(valuesReader);
             this.dict = null;
+            this.isUtcTimestamp = isUtcTimestamp;
         }
 
-        public DefaultParquetDataColumnReader(Dictionary dict) {
+        public DefaultParquetDataColumnReader(Dictionary dict, boolean 
isUtcTimestamp) {
             this.valuesReader = null;
             this.dict = checkNotNull(dict);
+            this.isUtcTimestamp = isUtcTimestamp;
         }
 
         @Override
@@ -117,8 +120,23 @@ public final class ParquetDataColumnReaderFactory {
         }
 
         @Override
-        public Timestamp readTimestamp(int id) {
-            throw new RuntimeException("Unsupported operation");
+        public Timestamp readNanosTimestamp() {
+            return int96TimestampConvert(valuesReader.readBytes());
+        }
+
+        @Override
+        public Timestamp readMillsTimestamp(int id) {
+            return Timestamp.fromEpochMillis(readLong(id));
+        }
+
+        @Override
+        public Timestamp readMicrosTimestamp(int id) {
+            return Timestamp.fromMicros(readLong(id));
+        }
+
+        @Override
+        public Timestamp readNanosTimestamp(int id) {
+            return int96TimestampConvert(dict.decodeToBinary(id));
         }
 
         @Override
@@ -166,31 +184,12 @@ public final class ParquetDataColumnReaderFactory {
             return valuesReader.readValueDictionaryId();
         }
 
-        public void skip() {
-            valuesReader.skip();
-        }
-
         @Override
         public Dictionary getDictionary() {
             return dict;
         }
-    }
-
-    /** The reader who reads from the underlying Timestamp value. */
-    public static class TypesFromInt96PageReader extends 
DefaultParquetDataColumnReader {
-        private final boolean isUtcTimestamp;
-
-        public TypesFromInt96PageReader(ValuesReader realReader, boolean 
isUtcTimestamp) {
-            super(realReader);
-            this.isUtcTimestamp = isUtcTimestamp;
-        }
 
-        public TypesFromInt96PageReader(Dictionary dict, boolean 
isUtcTimestamp) {
-            super(dict);
-            this.isUtcTimestamp = isUtcTimestamp;
-        }
-
-        private Timestamp convert(Binary binary) {
+        private Timestamp int96TimestampConvert(Binary binary) {
             ByteBuffer buf = binary.toByteBuffer();
             buf.order(ByteOrder.LITTLE_ENDIAN);
             long timeOfDayNanos = buf.getLong();
@@ -198,48 +197,25 @@ public final class ParquetDataColumnReaderFactory {
             return TimestampColumnReader.int96ToTimestamp(
                     isUtcTimestamp, timeOfDayNanos, julianDay);
         }
-
-        @Override
-        public Timestamp readTimestamp(int id) {
-            return convert(dict.decodeToBinary(id));
-        }
-
-        @Override
-        public Timestamp readMillsTimestamp() {
-            return convert(valuesReader.readBytes());
-        }
-
-        @Override
-        public Timestamp readMicrosTimestamp() {
-            return convert(valuesReader.readBytes());
-        }
     }
 
     private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
             boolean isDictionary,
-            PrimitiveType parquetType,
             @Nullable Dictionary dictionary,
             @Nullable ValuesReader valuesReader,
             boolean isUtcTimestamp) {
-        if (parquetType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.INT96) {
-            return isDictionary
-                    ? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
-                    : new TypesFromInt96PageReader(valuesReader, 
isUtcTimestamp);
-        } else {
-            return isDictionary
-                    ? new DefaultParquetDataColumnReader(dictionary)
-                    : new DefaultParquetDataColumnReader(valuesReader);
-        }
+        return isDictionary
+                ? new DefaultParquetDataColumnReader(dictionary, 
isUtcTimestamp)
+                : new DefaultParquetDataColumnReader(valuesReader, 
isUtcTimestamp);
     }
 
     public static ParquetDataColumnReader 
getDataColumnReaderByTypeOnDictionary(
-            PrimitiveType parquetType, Dictionary realReader, boolean 
isUtcTimestamp) {
-        return getDataColumnReaderByTypeHelper(true, parquetType, realReader, 
null, isUtcTimestamp);
+            Dictionary realReader, boolean isUtcTimestamp) {
+        return getDataColumnReaderByTypeHelper(true, realReader, null, 
isUtcTimestamp);
     }
 
     public static ParquetDataColumnReader getDataColumnReaderByType(
-            PrimitiveType parquetType, ValuesReader realReader, boolean 
isUtcTimestamp) {
-        return getDataColumnReaderByTypeHelper(
-                false, parquetType, null, realReader, isUtcTimestamp);
+            ValuesReader realReader, boolean isUtcTimestamp) {
+        return getDataColumnReaderByTypeHelper(false, null, realReader, 
isUtcTimestamp);
     }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 56aa83875..03e1ec072 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -81,6 +81,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -135,7 +136,9 @@ public class ParquetReadWriteTest {
                             new MultisetType(new 
VarCharType(VarCharType.MAX_LENGTH)),
                             RowType.builder()
                                     .fields(new 
VarCharType(VarCharType.MAX_LENGTH), new IntType())
-                                    .build())
+                                    .build(),
+                            new MapType(
+                                    new TimestampType(6), new 
VarCharType(VarCharType.MAX_LENGTH)))
                     .build();
 
     private static final RowType NESTED_ARRAY_MAP_TYPE =
@@ -469,6 +472,9 @@ public class ParquetReadWriteTest {
         Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
         Options conf = new Options();
         conf.setInteger("parquet.block.size", rowGroupSize);
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            conf.set("parquet.enable.dictionary", "false");
+        }
         ParquetWriterFactory factory =
                 new ParquetWriterFactory(new RowDataParquetBuilder(rowType, 
conf));
         String[] candidates = new String[] {"snappy", "zstd", "gzip"};
@@ -611,17 +617,22 @@ public class ParquetReadWriteTest {
             return new GenericRow(ROW_TYPE.getFieldCount());
         }
 
+        BinaryString str = BinaryString.fromString("" + v);
+
         Map<BinaryString, BinaryString> f30 = new HashMap<>();
-        f30.put(BinaryString.fromString("" + v), BinaryString.fromString("" + 
v));
+        f30.put(str, str);
 
         Map<Integer, Boolean> f31 = new HashMap<>();
         f31.put(v, v % 2 == 0);
 
         Map<BinaryString, Integer> f32 = new HashMap<>();
-        f32.put(BinaryString.fromString("" + v), v);
+        f32.put(str, v);
+
+        Map<Timestamp, BinaryString> f34 = new HashMap<>();
+        f34.put(toMicros(v), str);
 
         return GenericRow.of(
-                BinaryString.fromString("" + v),
+                str,
                 v % 2 == 0,
                 v.byteValue(),
                 v.shortValue(),
@@ -638,7 +649,7 @@ public class ParquetReadWriteTest {
                 Decimal.fromBigDecimal(BigDecimal.valueOf(v), 5, 0),
                 Decimal.fromBigDecimal(BigDecimal.valueOf(v), 15, 0),
                 Decimal.fromBigDecimal(BigDecimal.valueOf(v), 20, 0),
-                new GenericArray(new Object[] {BinaryString.fromString("" + 
v), null}),
+                new GenericArray(new Object[] {str, null}),
                 new GenericArray(new Object[] {v % 2 == 0, null}),
                 new GenericArray(new Object[] {v.byteValue(), null}),
                 new GenericArray(new Object[] {v.shortValue(), null}),
@@ -670,7 +681,8 @@ public class ParquetReadWriteTest {
                 new GenericMap(f30),
                 new GenericMap(f31),
                 new GenericMap(f32),
-                GenericRow.of(BinaryString.fromString("" + v), v));
+                GenericRow.of(str, v),
+                new GenericMap(f34));
     }
 
     private Timestamp toMills(Integer v) {

Reply via email to