This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ddd41a417 [parquet] support read parquet nested columns. (#3656)
ddd41a417 is described below

commit ddd41a417a7b2636e0a0fef762f92745ff5f6d10
Author: Wenchao Wu <[email protected]>
AuthorDate: Wed Jul 3 10:37:48 2024 +0800

    [parquet] support read parquet nested columns. (#3656)
---
 .../paimon/data/columnar/heap/HeapRowVector.java   |   6 +-
 .../org/apache/paimon/utils/BooleanArrayList.java  |  65 +++
 .../java/org/apache/paimon/utils/IntArrayList.java |  89 +++
 .../org/apache/paimon/utils/LongArrayList.java     |  79 +++
 .../format/parquet/ParquetReaderFactory.java       |  21 +-
 .../parquet/position/CollectionPosition.java       |  54 ++
 .../format/parquet/position/LevelDelegation.java   |  38 ++
 .../format/parquet/position/RowPosition.java       |  40 ++
 .../format/parquet/reader/NestedColumnReader.java  | 255 +++++++++
 .../format/parquet/reader/NestedPositionUtil.java  | 208 +++++++
 .../reader/NestedPrimitiveColumnReader.java        | 622 +++++++++++++++++++++
 .../parquet/reader/ParquetDecimalVector.java       | 161 +++++-
 .../parquet/reader/ParquetSplitReaderUtil.java     | 263 +++++++--
 .../paimon/format/parquet/type/ParquetField.java   |  66 +++
 .../format/parquet/type/ParquetGroupField.java     |  47 ++
 .../format/parquet/type/ParquetPrimitiveField.java |  51 ++
 .../format/parquet/ParquetReadWriteTest.java       | 329 ++++++++++-
 17 files changed, 2332 insertions(+), 62 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
index bfa16ce96..37d619bde 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/heap/HeapRowVector.java
@@ -27,7 +27,7 @@ import 
org.apache.paimon.data.columnar.writable.WritableColumnVector;
 public class HeapRowVector extends AbstractHeapVector
         implements WritableColumnVector, RowColumnVector {
 
-    private final WritableColumnVector[] fields;
+    private WritableColumnVector[] fields;
 
     public HeapRowVector(int len, WritableColumnVector... fields) {
         super(len);
@@ -57,4 +57,8 @@ public class HeapRowVector extends AbstractHeapVector
             field.reset();
         }
     }
+
+    public void setFields(WritableColumnVector[] fields) {
+        this.fields = fields;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/BooleanArrayList.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/BooleanArrayList.java
new file mode 100644
index 000000000..cbb16b703
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BooleanArrayList.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.Arrays;
+
+/** Minimal implementation of an array-backed list of booleans. */
+public class BooleanArrayList {
+    private int size;
+
+    private boolean[] array;
+
+    public BooleanArrayList(int capacity) {
+        this.size = 0;
+        this.array = new boolean[capacity];
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public boolean add(boolean element) {
+        grow(size + 1);
+        array[size++] = element;
+        return true;
+    }
+
+    public void clear() {
+        size = 0;
+    }
+
+    public boolean isEmpty() {
+        return (size == 0);
+    }
+
+    public boolean[] toArray() {
+        return Arrays.copyOf(array, size);
+    }
+
+    private void grow(int length) {
+        if (length > array.length) {
+            final int newLength =
+                    (int) Math.max(Math.min(2L * array.length, 
Integer.MAX_VALUE - 8), length);
+            final boolean[] t = new boolean[newLength];
+            System.arraycopy(array, 0, t, 0, size);
+            array = t;
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
new file mode 100644
index 000000000..575fae02a
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.Arrays;
+import java.util.NoSuchElementException;
+
+/** Minimal implementation of an array-backed list of ints. */
+public class IntArrayList {
+
+    private int size;
+
+    private int[] array;
+
+    public IntArrayList(final int capacity) {
+        this.size = 0;
+        this.array = new int[capacity];
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public boolean add(final int number) {
+        grow(size + 1);
+        array[size++] = number;
+        return true;
+    }
+
+    public int removeLast() {
+        if (size == 0) {
+            throw new NoSuchElementException();
+        }
+        --size;
+        return array[size];
+    }
+
+    public void clear() {
+        size = 0;
+    }
+
+    public boolean isEmpty() {
+        return size == 0;
+    }
+
+    private void grow(final int length) {
+        if (length > array.length) {
+            final int newLength =
+                    (int) Math.max(Math.min(2L * array.length, 
Integer.MAX_VALUE - 8), length);
+            final int[] t = new int[newLength];
+            System.arraycopy(array, 0, t, 0, size);
+            array = t;
+        }
+    }
+
+    public int[] toArray() {
+        return Arrays.copyOf(array, size);
+    }
+
+    public static final IntArrayList EMPTY =
+            new IntArrayList(0) {
+
+                @Override
+                public boolean add(int number) {
+                    throw new UnsupportedOperationException();
+                }
+
+                @Override
+                public int removeLast() {
+                    throw new UnsupportedOperationException();
+                }
+            };
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/LongArrayList.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/LongArrayList.java
new file mode 100644
index 000000000..e94f46897
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/LongArrayList.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.util.Arrays;
+
+/** Minimal implementation of an array-backed list of longs. */
+public class LongArrayList {
+
+    private int size;
+
+    private long[] array;
+
+    public LongArrayList(int capacity) {
+        this.size = 0;
+        this.array = new long[capacity];
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public boolean add(long number) {
+        grow(size + 1);
+        array[size++] = number;
+        return true;
+    }
+
+    public long removeLong(int index) {
+        if (index >= size) {
+            throw new IndexOutOfBoundsException(
+                    "Index (" + index + ") is greater than or equal to list 
size (" + size + ")");
+        }
+        final long old = array[index];
+        size--;
+        if (index != size) {
+            System.arraycopy(array, index + 1, array, index, size - index);
+        }
+        return old;
+    }
+
+    public void clear() {
+        size = 0;
+    }
+
+    public boolean isEmpty() {
+        return (size == 0);
+    }
+
+    public long[] toArray() {
+        return Arrays.copyOf(array, size);
+    }
+
+    private void grow(int length) {
+        if (length > array.length) {
+            final int newLength =
+                    (int) Math.max(Math.min(2L * array.length, 
Integer.MAX_VALUE - 8), length);
+            final long[] t = new long[newLength];
+            System.arraycopy(array, 0, t, 0, size);
+            array = t;
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index afaf8a501..b0715bb53 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -28,6 +28,7 @@ import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.parquet.reader.ColumnReader;
 import org.apache.paimon.format.parquet.reader.ParquetDecimalVector;
 import org.apache.paimon.format.parquet.reader.ParquetTimestampVector;
+import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.reader.RecordReader;
@@ -42,6 +43,8 @@ import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetInputFormat;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
@@ -57,6 +60,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import static 
org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.buildFieldsList;
 import static 
org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createColumnReader;
 import static 
org.apache.paimon.format.parquet.reader.ParquetSplitReaderUtil.createWritableColumnVector;
 import static 
org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
@@ -72,6 +76,8 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
     private static final String ALLOCATION_SIZE = 
"parquet.read.allocation.size";
 
     private final Options conf;
+
+    private final RowType projectedType;
     private final String[] projectedFields;
     private final DataType[] projectedTypes;
     private final int batchSize;
@@ -81,6 +87,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
     public ParquetReaderFactory(
             Options conf, RowType projectedType, int batchSize, 
FilterCompat.Filter filter) {
         this.conf = conf;
+        this.projectedType = projectedType;
         this.projectedFields = projectedType.getFieldNames().toArray(new 
String[0]);
         this.projectedTypes = projectedType.getFieldTypes().toArray(new 
DataType[0]);
         this.batchSize = batchSize;
@@ -106,7 +113,12 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         Pool<ParquetReaderBatch> poolOfBatches =
                 createPoolOfBatches(context.filePath(), requestedSchema);
 
-        return new ParquetReader(reader, requestedSchema, 
reader.getRecordCount(), poolOfBatches);
+        MessageColumnIO columnIO = new 
ColumnIOFactory().getColumnIO(requestedSchema);
+        List<ParquetField> fields =
+                buildFieldsList(projectedType.getFields(), 
projectedType.getFieldNames(), columnIO);
+
+        return new ParquetReader(
+                reader, requestedSchema, reader.getRecordCount(), 
poolOfBatches, fields);
     }
 
     private void setReadOptions(ParquetReadOptions.Builder builder) {
@@ -270,11 +282,14 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
         @SuppressWarnings("rawtypes")
         private ColumnReader[] columnReaders;
 
+        private final List<ParquetField> fields;
+
         private ParquetReader(
                 ParquetFileReader reader,
                 MessageType requestedSchema,
                 long totalRowCount,
-                Pool<ParquetReaderBatch> pool) {
+                Pool<ParquetReaderBatch> pool,
+                List<ParquetField> fields) {
             this.reader = reader;
             this.requestedSchema = requestedSchema;
             this.totalRowCount = totalRowCount;
@@ -283,6 +298,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
             this.totalCountLoadedSoFar = 0;
             this.currentRowPosition = 0;
             this.nextRowPosition = 0;
+            this.fields = fields;
         }
 
         @Nullable
@@ -348,6 +364,7 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                                     types.get(i),
                                     requestedSchema.getColumns(),
                                     rowGroup,
+                                    fields.get(i),
                                     0);
                 }
             }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java
new file mode 100644
index 000000000..e72a4280f
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/CollectionPosition.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.position;
+
+import javax.annotation.Nullable;
+
+/** To represent collection's position in repeated type. */
+public class CollectionPosition {
+    @Nullable private final boolean[] isNull;
+    private final long[] offsets;
+
+    private final long[] length;
+
+    private final int valueCount;
+
+    public CollectionPosition(boolean[] isNull, long[] offsets, long[] length, 
int valueCount) {
+        this.isNull = isNull;
+        this.offsets = offsets;
+        this.length = length;
+        this.valueCount = valueCount;
+    }
+
+    public boolean[] getIsNull() {
+        return isNull;
+    }
+
+    public long[] getOffsets() {
+        return offsets;
+    }
+
+    public long[] getLength() {
+        return length;
+    }
+
+    public int getValueCount() {
+        return valueCount;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java
new file mode 100644
index 000000000..25bbedc86
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/LevelDelegation.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.position;
+
+/** To delegate repetition level and definition level. */
+public class LevelDelegation {
+    private final int[] repetitionLevel;
+    private final int[] definitionLevel;
+
+    public LevelDelegation(int[] repetitionLevel, int[] definitionLevel) {
+        this.repetitionLevel = repetitionLevel;
+        this.definitionLevel = definitionLevel;
+    }
+
+    public int[] getRepetitionLevel() {
+        return repetitionLevel;
+    }
+
+    public int[] getDefinitionLevel() {
+        return definitionLevel;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
new file mode 100644
index 000000000..fb6378349
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.position;
+
+import javax.annotation.Nullable;
+
+/** To represent struct's position in repeated type. */
+public class RowPosition {
+    @Nullable private final boolean[] isNull;
+    private final int positionsCount;
+
+    public RowPosition(boolean[] isNull, int positionsCount) {
+        this.isNull = isNull;
+        this.positionsCount = positionsCount;
+    }
+
+    public boolean[] getIsNull() {
+        return isNull;
+    }
+
+    public int getPositionsCount() {
+        return positionsCount;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
new file mode 100644
index 000000000..e39005800
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.heap.AbstractHeapVector;
+import org.apache.paimon.data.columnar.heap.HeapArrayVector;
+import org.apache.paimon.data.columnar.heap.HeapMapVector;
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.format.parquet.position.CollectionPosition;
+import org.apache.paimon.format.parquet.position.LevelDelegation;
+import org.apache.paimon.format.parquet.position.RowPosition;
+import org.apache.paimon.format.parquet.type.ParquetField;
+import org.apache.paimon.format.parquet.type.ParquetGroupField;
+import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.MultisetType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Pair;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This ColumnReader mainly used to read `Group` type in parquet such as 
`Map`, `Array`, `Row`. The
+ * method about how to resolve nested struct mainly refer to : <a
+ * 
href="https://github.com/julienledem/redelm/wiki/The-striping-and-assembly-algorithms-from-the-Dremel-paper";>The
+ * striping and assembly algorithms from the Dremel paper</a>.
+ *
+ * <p>Brief explanation of reading repetition and definition levels: 
Repetition level equal to 0
+ * means that this is the beginning of a new row. Other value means that we 
should add data to the
+ * current row.
+ *
+ * <p>For example, if we have the following data: repetition levels: 
0,1,1,0,0,1,[0] (last 0 is
+ * implicit, normally will be the end of the page) values: a,b,c,d,e,f will 
consist of the sets of:
+ * (a, b, c), (d), (e, f). <br>
+ *
+ * <p>Definition levels contains 3 situations: level = maxDefLevel means value 
exist and is not null
+ * level = maxDefLevel - 1 means value is null level < maxDefLevel - 1 means 
value doesn't exist For
+ * non-nullable (REQUIRED) fields the (level = maxDefLevel - 1) condition 
means non-existing value
+ * as well. <br>
+ *
+ * <p>Quick example (maxDefLevel is 2): Read 3 rows out of: repetition levels: 
0,1,0,1,1,0,0,...
+ * definition levels: 2,1,0,2,1,2,... values: a,b,c,d,e,f,... Resulting 
buffer: a,n, ,d,n,f that
+ * result is (a,n),(d,n),(f) where n means null
+ */
+public class NestedColumnReader implements ColumnReader<WritableColumnVector> {
+
+    private final Map<ColumnDescriptor, NestedPrimitiveColumnReader> 
columnReaders;
+    private final boolean isUtcTimestamp;
+
+    private final PageReadStore pages;
+
+    private final ParquetField field;
+
+    public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages, 
ParquetField field) {
+        this.isUtcTimestamp = isUtcTimestamp;
+        this.pages = pages;
+        this.field = field;
+        this.columnReaders = new HashMap<>();
+    }
+
+    @Override
+    public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
+        readData(field, readNumber, vector, false);
+    }
+
+    private Pair<LevelDelegation, WritableColumnVector> readData(
+            ParquetField field, int readNumber, ColumnVector vector, boolean 
inside)
+            throws IOException {
+        if (field.getType() instanceof RowType) {
+            return readRow((ParquetGroupField) field, readNumber, vector, 
inside);
+        } else if (field.getType() instanceof MapType || field.getType() 
instanceof MultisetType) {
+            return readMap((ParquetGroupField) field, readNumber, vector, 
inside);
+        } else if (field.getType() instanceof ArrayType) {
+            return readArray((ParquetGroupField) field, readNumber, vector, 
inside);
+        } else {
+            return readPrimitive((ParquetPrimitiveField) field, readNumber, 
vector);
+        }
+    }
+
+    private Pair<LevelDelegation, WritableColumnVector> readRow(
+            ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
+            throws IOException {
+        HeapRowVector heapRowVector = (HeapRowVector) vector;
+        LevelDelegation levelDelegation = null;
+        List<ParquetField> children = field.getChildren();
+        WritableColumnVector[] childrenVectors = heapRowVector.getFields();
+        WritableColumnVector[] finalChildrenVectors =
+                new WritableColumnVector[childrenVectors.length];
+        for (int i = 0; i < children.size(); i++) {
+            Pair<LevelDelegation, WritableColumnVector> tuple =
+                    readData(children.get(i), readNumber, childrenVectors[i], 
true);
+            levelDelegation = tuple.getLeft();
+            finalChildrenVectors[i] = tuple.getRight();
+        }
+        if (levelDelegation == null) {
+            throw new RuntimeException(
+                    String.format("Row field does not have any children: %s.", 
field));
+        }
+
+        RowPosition rowPosition =
+                NestedPositionUtil.calculateRowOffsets(
+                        field,
+                        levelDelegation.getDefinitionLevel(),
+                        levelDelegation.getRepetitionLevel());
+
+        // If row was inside the structure, then we need to renew the vector 
to reset the
+        // capacity.
+        if (inside) {
+            heapRowVector =
+                    new HeapRowVector(rowPosition.getPositionsCount(), 
finalChildrenVectors);
+        } else {
+            heapRowVector.setFields(finalChildrenVectors);
+        }
+
+        if (rowPosition.getIsNull() != null) {
+            setFieldNullFalg(rowPosition.getIsNull(), heapRowVector);
+        }
+        return Pair.of(levelDelegation, heapRowVector);
+    }
+
+    private Pair<LevelDelegation, WritableColumnVector> readMap(
+            ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
+            throws IOException {
+        HeapMapVector mapVector = (HeapMapVector) vector;
+        mapVector.reset();
+        List<ParquetField> children = field.getChildren();
+        Preconditions.checkArgument(
+                children.size() == 2,
+                "Maps must have two type parameters, found %s",
+                children.size());
+        Pair<LevelDelegation, WritableColumnVector> keyTuple =
+                readData(children.get(0), readNumber, 
mapVector.getKeyColumnVector(), true);
+        Pair<LevelDelegation, WritableColumnVector> valueTuple =
+                readData(children.get(1), readNumber, 
mapVector.getValueColumnVector(), true);
+
+        LevelDelegation levelDelegation = keyTuple.getLeft();
+
+        CollectionPosition collectionPosition =
+                NestedPositionUtil.calculateCollectionOffsets(
+                        field,
+                        levelDelegation.getDefinitionLevel(),
+                        levelDelegation.getRepetitionLevel());
+
+        // If map was inside the structure, then we need to renew the vector 
to reset the
+        // capacity.
+        if (inside) {
+            mapVector =
+                    new HeapMapVector(
+                            collectionPosition.getValueCount(),
+                            keyTuple.getRight(),
+                            valueTuple.getRight());
+        } else {
+            mapVector.setKeys(keyTuple.getRight());
+            mapVector.setValues(valueTuple.getRight());
+        }
+
+        if (collectionPosition.getIsNull() != null) {
+            setFieldNullFalg(collectionPosition.getIsNull(), mapVector);
+        }
+
+        mapVector.setLengths(collectionPosition.getLength());
+        mapVector.setOffsets(collectionPosition.getOffsets());
+
+        return Pair.of(levelDelegation, mapVector);
+    }
+
+    private Pair<LevelDelegation, WritableColumnVector> readArray(
+            ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
+            throws IOException {
+        HeapArrayVector arrayVector = (HeapArrayVector) vector;
+        arrayVector.reset();
+        List<ParquetField> children = field.getChildren();
+        Preconditions.checkArgument(
+                children.size() == 1,
+                "Arrays must have a single type parameter, found %s",
+                children.size());
+        Pair<LevelDelegation, WritableColumnVector> tuple =
+                readData(children.get(0), readNumber, arrayVector.getChild(), 
true);
+
+        LevelDelegation levelDelegation = tuple.getLeft();
+        CollectionPosition collectionPosition =
+                NestedPositionUtil.calculateCollectionOffsets(
+                        field,
+                        levelDelegation.getDefinitionLevel(),
+                        levelDelegation.getRepetitionLevel());
+
+        // If array was inside the structure, then we need to renew the vector 
to reset the
+        // capacity.
+        if (inside) {
+            arrayVector = new 
HeapArrayVector(collectionPosition.getValueCount(), tuple.getRight());
+        } else {
+            arrayVector.setChild(tuple.getRight());
+        }
+
+        if (collectionPosition.getIsNull() != null) {
+            setFieldNullFalg(collectionPosition.getIsNull(), arrayVector);
+        }
+        arrayVector.setLengths(collectionPosition.getLength());
+        arrayVector.setOffsets(collectionPosition.getOffsets());
+        return Pair.of(levelDelegation, arrayVector);
+    }
+
+    private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
+            ParquetPrimitiveField field, int readNumber, ColumnVector vector) 
throws IOException {
+        ColumnDescriptor descriptor = field.getDescriptor();
+        NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
+        if (reader == null) {
+            reader =
+                    new NestedPrimitiveColumnReader(
+                            descriptor,
+                            pages.getPageReader(descriptor),
+                            isUtcTimestamp,
+                            descriptor.getPrimitiveType(),
+                            field.getType());
+            columnReaders.put(descriptor, reader);
+        }
+        WritableColumnVector writableColumnVector =
+                reader.readAndNewVector(readNumber, (WritableColumnVector) 
vector);
+        return Pair.of(reader.getLevelDelegation(), writableColumnVector);
+    }
+
+    private static void setFieldNullFalg(boolean[] nullFlags, 
AbstractHeapVector vector) {
+        for (int index = 0; index < vector.getLen() && index < 
nullFlags.length; index++) {
+            if (nullFlags[index]) {
+                vector.setNullAt(index);
+            }
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
new file mode 100644
index 000000000..5f0757c23
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.format.parquet.position.CollectionPosition;
+import org.apache.paimon.format.parquet.position.RowPosition;
+import org.apache.paimon.format.parquet.type.ParquetField;
+import org.apache.paimon.utils.BooleanArrayList;
+import org.apache.paimon.utils.LongArrayList;
+
+import static java.lang.String.format;
+
+/** Utils to calculate nested type position. */
+public class NestedPositionUtil {
+
+    /**
+     * Calculate row offsets according to column's max repetition level, 
definition level, value's
+     * repetition level and definition level. Each row has three situation:
+     * <li>Row is not defined,because it's optional parent fields is null, 
this is decided by its
+     *     parent's repetition level
+     * <li>Row is null
+     * <li>Row is defined and not empty.
+     *
+     * @param field field that contains the row column message include max 
repetition level and
+     *     definition level.
+     * @param fieldRepetitionLevels int array with each value's repetition 
level.
+     * @param fieldDefinitionLevels int array with each value's definition 
level.
+     * @return {@link RowPosition} contains collections row count and isNull 
array.
+     */
+    public static RowPosition calculateRowOffsets(
+            ParquetField field, int[] fieldDefinitionLevels, int[] 
fieldRepetitionLevels) {
+        int rowDefinitionLevel = field.getDefinitionLevel();
+        int rowRepetitionLevel = field.getRepetitionLevel();
+        int nullValuesCount = 0;
+        BooleanArrayList nullRowFlags = new BooleanArrayList(0);
+        for (int i = 0; i < fieldDefinitionLevels.length; i++) {
+            if (fieldRepetitionLevels[i] > rowRepetitionLevel) {
+                throw new IllegalStateException(
+                        format(
+                                "In parquet's row type field repetition level 
should not larger than row's repetition level. "
+                                        + "Row repetition level is %s, row 
field repetition level is %s.",
+                                rowRepetitionLevel, fieldRepetitionLevels[i]));
+            }
+
+            if (fieldDefinitionLevels[i] >= rowDefinitionLevel) {
+                // current row is defined and not empty
+                nullRowFlags.add(false);
+            } else {
+                // current row is null
+                nullRowFlags.add(true);
+                nullValuesCount++;
+            }
+        }
+        if (nullValuesCount == 0) {
+            return new RowPosition(null, fieldDefinitionLevels.length);
+        }
+        return new RowPosition(nullRowFlags.toArray(), nullRowFlags.size());
+    }
+
+    /**
+     * Calculate the collection's offsets according to column's max repetition 
level, definition
+     * level, value's repetition level and definition level. Each collection 
(Array or Map) has four
+     * situation:
+     * <li>Collection is not defined, because optional parent fields is null, 
this is decided by its
+     *     parent's repetition level
+     * <li>Collection is null
+     * <li>Collection is defined but empty
+     * <li>Collection is defined and not empty. In this case offset value is 
increased by the number
+     *     of elements in that collection
+     *
+     * @param field field that contains array/map column message include max 
repetition level and
+     *     definition level.
+     * @param definitionLevels int array with each value's repetition level.
+     * @param repetitionLevels int array with each value's definition level.
+     * @return {@link CollectionPosition} contains collections offset array, 
length array and isNull
+     *     array.
+     */
+    public static CollectionPosition calculateCollectionOffsets(
+            ParquetField field, int[] definitionLevels, int[] 
repetitionLevels) {
+        int collectionDefinitionLevel = field.getDefinitionLevel();
+        int collectionRepetitionLevel = field.getRepetitionLevel() + 1;
+        int offset = 0;
+        int valueCount = 0;
+        LongArrayList offsets = new LongArrayList(0);
+        offsets.add(offset);
+        BooleanArrayList emptyCollectionFlags = new BooleanArrayList(0);
+        BooleanArrayList nullCollectionFlags = new BooleanArrayList(0);
+        int nullValuesCount = 0;
+        for (int i = 0;
+                i < definitionLevels.length;
+                i = getNextCollectionStartIndex(repetitionLevels, 
collectionRepetitionLevel, i)) {
+            valueCount++;
+            if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
+                boolean isNull =
+                        isOptionalFieldValueNull(definitionLevels[i], 
collectionDefinitionLevel);
+                nullCollectionFlags.add(isNull);
+                nullValuesCount += isNull ? 1 : 0;
+                // definitionLevels[i] > collectionDefinitionLevel  => 
Collection is defined and not
+                // empty
+                // definitionLevels[i] == collectionDefinitionLevel => 
Collection is defined but
+                // empty
+                if (definitionLevels[i] > collectionDefinitionLevel) {
+                    emptyCollectionFlags.add(false);
+                    offset += getCollectionSize(repetitionLevels, 
collectionRepetitionLevel, i + 1);
+                } else if (definitionLevels[i] == collectionDefinitionLevel) {
+                    offset++;
+                    emptyCollectionFlags.add(true);
+                } else {
+                    offset++;
+                    emptyCollectionFlags.add(false);
+                }
+                offsets.add(offset);
+            } else {
+                // when definitionLevels[i] < collectionDefinitionLevel - 1, 
it means the collection
+                // is
+                // not defined, but we need to regard it as null to avoid 
getting value wrong.
+                nullCollectionFlags.add(true);
+                nullValuesCount++;
+                offsets.add(++offset);
+                emptyCollectionFlags.add(false);
+            }
+        }
+        long[] offsetsArray = offsets.toArray();
+        long[] length = 
calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray);
+        if (nullValuesCount == 0) {
+            return new CollectionPosition(null, offsetsArray, length, 
valueCount);
+        }
+        return new CollectionPosition(
+                nullCollectionFlags.toArray(), offsetsArray, length, 
valueCount);
+    }
+
+    public static boolean isOptionalFieldValueNull(int definitionLevel, int 
maxDefinitionLevel) {
+        return definitionLevel == maxDefinitionLevel - 1;
+    }
+
+    public static long[] calculateLengthByOffsets(
+            boolean[] collectionIsEmpty, long[] arrayOffsets) {
+        LongArrayList lengthList = new LongArrayList(arrayOffsets.length);
+        for (int i = 0; i < arrayOffsets.length - 1; i++) {
+            long offset = arrayOffsets[i];
+            long length = arrayOffsets[i + 1] - offset;
+            if (length < 0) {
+                throw new IllegalArgumentException(
+                        format(
+                                "Offset is not monotonically ascending. 
offsets[%s]=%s, offsets[%s]=%s",
+                                i, arrayOffsets[i], i + 1, arrayOffsets[i + 
1]));
+            }
+            if (collectionIsEmpty[i]) {
+                length = 0;
+            }
+            lengthList.add(length);
+        }
+        return lengthList.toArray();
+    }
+
+    private static int getNextCollectionStartIndex(
+            int[] repetitionLevels, int maxRepetitionLevel, int elementIndex) {
+        do {
+            elementIndex++;
+        } while (hasMoreElements(repetitionLevels, elementIndex)
+                && isNotCollectionBeginningMarker(
+                        repetitionLevels, maxRepetitionLevel, elementIndex));
+        return elementIndex;
+    }
+
+    /** This method is only called for non-empty collections. */
+    private static int getCollectionSize(
+            int[] repetitionLevels, int maxRepetitionLevel, int nextIndex) {
+        int size = 1;
+        while (hasMoreElements(repetitionLevels, nextIndex)
+                && isNotCollectionBeginningMarker(
+                        repetitionLevels, maxRepetitionLevel, nextIndex)) {
+            // Collection elements cannot only be primitive, but also can have 
nested structure
+            // Counting only elements which belong to current collection, 
skipping inner elements of
+            // nested collections/structs
+            if (repetitionLevels[nextIndex] <= maxRepetitionLevel) {
+                size++;
+            }
+            nextIndex++;
+        }
+        return size;
+    }
+
+    private static boolean isNotCollectionBeginningMarker(
+            int[] repetitionLevels, int maxRepetitionLevel, int nextIndex) {
+        return repetitionLevels[nextIndex] >= maxRepetitionLevel;
+    }
+
+    private static boolean hasMoreElements(int[] repetitionLevels, int 
nextIndex) {
+        return nextIndex < repetitionLevels.length;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
new file mode 100644
index 000000000..dbbf2028c
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -0,0 +1,622 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.data.columnar.heap.HeapBooleanVector;
+import org.apache.paimon.data.columnar.heap.HeapByteVector;
+import org.apache.paimon.data.columnar.heap.HeapBytesVector;
+import org.apache.paimon.data.columnar.heap.HeapDoubleVector;
+import org.apache.paimon.data.columnar.heap.HeapFloatVector;
+import org.apache.paimon.data.columnar.heap.HeapIntVector;
+import org.apache.paimon.data.columnar.heap.HeapLongVector;
+import org.apache.paimon.data.columnar.heap.HeapShortVector;
+import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.format.parquet.position.LevelDelegation;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.utils.IntArrayList;
+
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/** Reader to read nested primitive column. */
+public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnVector> {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NestedPrimitiveColumnReader.class);
+
+    private final IntArrayList repetitionLevelList = new IntArrayList(0);
+    private final IntArrayList definitionLevelList = new IntArrayList(0);
+
+    private final PageReader pageReader;
+    private final ColumnDescriptor descriptor;
+    private final Type type;
+    private final DataType dataType;
+    /** The dictionary, if this column has dictionary encoding. */
+    private final ParquetDataColumnReader dictionary;
+    /** Maximum definition level for this column. */
+    private final int maxDefLevel;
+
+    private boolean isUtcTimestamp;
+
+    /** Total number of values read. */
+    private long valuesRead;
+
+    /**
+     * value that indicates the end of the current page. That is, if 
valuesRead ==
+     * endOfPageValueCount, we are at the end of the page.
+     */
+    private long endOfPageValueCount;
+
+    /** If true, the current page is dictionary encoded. */
+    private boolean isCurrentPageDictionaryEncoded;
+
+    private int definitionLevel;
+    private int repetitionLevel;
+
+    /** Repetition/Definition/Value readers. */
+    private IntIterator repetitionLevelColumn;
+
+    private IntIterator definitionLevelColumn;
+    private ParquetDataColumnReader dataColumn;
+
+    /** Total values in the current page. */
+    private int pageValueCount;
+
+    // flag to indicate if there is no data in parquet data page
+    private boolean eof = false;
+
+    private boolean isFirstRow = true;
+
+    private Object lastValue;
+
+    public NestedPrimitiveColumnReader(
+            ColumnDescriptor descriptor,
+            PageReader pageReader,
+            boolean isUtcTimestamp,
+            Type parquetType,
+            DataType dataType)
+            throws IOException {
+        this.descriptor = descriptor;
+        this.type = parquetType;
+        this.pageReader = pageReader;
+        this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+        this.isUtcTimestamp = isUtcTimestamp;
+        this.dataType = dataType;
+
+        DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+        if (dictionaryPage != null) {
+            try {
+                this.dictionary =
+                        
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+                                parquetType.asPrimitiveType(),
+                                dictionaryPage
+                                        .getEncoding()
+                                        .initDictionary(descriptor, 
dictionaryPage),
+                                isUtcTimestamp);
+                this.isCurrentPageDictionaryEncoded = true;
+            } catch (IOException e) {
+                throw new IOException(
+                        String.format("Could not decode the dictionary for 
%s", descriptor), e);
+            }
+        } else {
+            this.dictionary = null;
+            this.isCurrentPageDictionaryEncoded = false;
+        }
+    }
+
+    // This won't call, will actually call readAndNewVector
+    @Override
+    public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
+        throw new UnsupportedOperationException("This function should no be 
called.");
+    }
+
+    public WritableColumnVector readAndNewVector(int readNumber, 
WritableColumnVector vector)
+            throws IOException {
+        if (isFirstRow) {
+            if (!readValue()) {
+                return vector;
+            }
+            isFirstRow = false;
+        }
+
+        // index to set value.
+        int index = 0;
+        int valueIndex = 0;
+        List<Object> valueList = new ArrayList<>();
+
+        // repeated type need two loops to read data.
+        while (!eof && index < readNumber) {
+            do {
+                valueList.add(lastValue);
+                valueIndex++;
+            } while (readValue() && (repetitionLevel != 0));
+            index++;
+        }
+
+        return fillColumnVector(valueIndex, valueList);
+    }
+
+    public LevelDelegation getLevelDelegation() {
+        int[] repetition = repetitionLevelList.toArray();
+        int[] definition = definitionLevelList.toArray();
+        repetitionLevelList.clear();
+        definitionLevelList.clear();
+        repetitionLevelList.add(repetitionLevel);
+        definitionLevelList.add(definitionLevel);
+        return new LevelDelegation(repetition, definition);
+    }
+
+    private boolean readValue() throws IOException {
+        int left = readPageIfNeed();
+        if (left > 0) {
+            // get the values of repetition and definitionLevel
+            readAndSaveRepetitionAndDefinitionLevels();
+            // read the data if it isn't null
+            if (definitionLevel == maxDefLevel) {
+                if (isCurrentPageDictionaryEncoded) {
+                    int dictionaryId = dataColumn.readValueDictionaryId();
+                    lastValue = dictionaryDecodeValue(dataType, dictionaryId);
+                } else {
+                    lastValue = readPrimitiveTypedRow(dataType);
+                }
+            } else {
+                lastValue = null;
+            }
+            return true;
+        } else {
+            eof = true;
+            return false;
+        }
+    }
+
+    private void readAndSaveRepetitionAndDefinitionLevels() {
+        // get the values of repetition and definitionLevel
+        repetitionLevel = repetitionLevelColumn.nextInt();
+        definitionLevel = definitionLevelColumn.nextInt();
+        valuesRead++;
+        repetitionLevelList.add(repetitionLevel);
+        definitionLevelList.add(definitionLevel);
+    }
+
+    private int readPageIfNeed() throws IOException {
+        // Compute the number of values we want to read in this page.
+        int leftInPage = (int) (endOfPageValueCount - valuesRead);
+        if (leftInPage == 0) {
+            // no data left in current page, load data from new page
+            readPage();
+            leftInPage = (int) (endOfPageValueCount - valuesRead);
+        }
+        return leftInPage;
+    }
+
+    private Object readPrimitiveTypedRow(DataType category) {
+        switch (category.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                return dataColumn.readBytes();
+            case BOOLEAN:
+                return dataColumn.readBoolean();
+            case TIME_WITHOUT_TIME_ZONE:
+            case DATE:
+            case INTEGER:
+                return dataColumn.readInteger();
+            case TINYINT:
+                return dataColumn.readTinyInt();
+            case SMALLINT:
+                return dataColumn.readSmallInt();
+            case BIGINT:
+                return dataColumn.readLong();
+            case FLOAT:
+                return dataColumn.readFloat();
+            case DOUBLE:
+                return dataColumn.readDouble();
+            case DECIMAL:
+                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+                    case INT32:
+                        return dataColumn.readInteger();
+                    case INT64:
+                        return dataColumn.readLong();
+                    case BINARY:
+                    case FIXED_LEN_BYTE_ARRAY:
+                        return dataColumn.readBytes();
+                }
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return dataColumn.readMillsTimestamp();
+            default:
+                throw new RuntimeException("Unsupported type in the list: " + 
type);
+        }
+    }
+
+    private Object dictionaryDecodeValue(DataType category, Integer 
dictionaryValue) {
+        if (dictionaryValue == null) {
+            return null;
+        }
+
+        switch (category.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                return dictionary.readBytes(dictionaryValue);
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+            case INTEGER:
+                return dictionary.readInteger(dictionaryValue);
+            case BOOLEAN:
+                return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
+            case DOUBLE:
+                return dictionary.readDouble(dictionaryValue);
+            case FLOAT:
+                return dictionary.readFloat(dictionaryValue);
+            case TINYINT:
+                return dictionary.readTinyInt(dictionaryValue);
+            case SMALLINT:
+                return dictionary.readSmallInt(dictionaryValue);
+            case BIGINT:
+                return dictionary.readLong(dictionaryValue);
+            case DECIMAL:
+                switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+                    case INT32:
+                        return dictionary.readInteger(dictionaryValue);
+                    case INT64:
+                        return dictionary.readLong(dictionaryValue);
+                    case FIXED_LEN_BYTE_ARRAY:
+                    case BINARY:
+                        return dictionary.readBytes(dictionaryValue);
+                }
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                return dictionary.readTimestamp(dictionaryValue);
+            default:
+                throw new RuntimeException("Unsupported type in the list: " + 
type);
+        }
+    }
+
+    private WritableColumnVector fillColumnVector(int total, List valueList) {
+        switch (dataType.getTypeRoot()) {
+            case CHAR:
+            case VARCHAR:
+            case BINARY:
+            case VARBINARY:
+                HeapBytesVector heapBytesVector = new HeapBytesVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    byte[] src = ((List<byte[]>) valueList).get(i);
+                    if (src == null) {
+                        heapBytesVector.setNullAt(i);
+                    } else {
+                        heapBytesVector.appendBytes(i, src, 0, src.length);
+                    }
+                }
+                return heapBytesVector;
+            case BOOLEAN:
+                HeapBooleanVector heapBooleanVector = new 
HeapBooleanVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    if (valueList.get(i) == null) {
+                        heapBooleanVector.setNullAt(i);
+                    } else {
+                        heapBooleanVector.vector[i] = ((List<Boolean>) 
valueList).get(i);
+                    }
+                }
+                return heapBooleanVector;
+            case TINYINT:
+                HeapByteVector heapByteVector = new HeapByteVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    if (valueList.get(i) == null) {
+                        heapByteVector.setNullAt(i);
+                    } else {
+                        heapByteVector.vector[i] =
+                                (byte) ((List<Integer>) 
valueList).get(i).intValue();
+                    }
+                }
+                return heapByteVector;
+            case SMALLINT:
+                HeapShortVector heapShortVector = new HeapShortVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    if (valueList.get(i) == null) {
+                        heapShortVector.setNullAt(i);
+                    } else {
+                        heapShortVector.vector[i] =
+                                (short) ((List<Integer>) 
valueList).get(i).intValue();
+                    }
+                }
+                return heapShortVector;
+            case INTEGER:
+            case DATE:
+            case TIME_WITHOUT_TIME_ZONE:
+                HeapIntVector heapIntVector = new HeapIntVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    if (valueList.get(i) == null) {
+                        heapIntVector.setNullAt(i);
+                    } else {
+                        heapIntVector.vector[i] = ((List<Integer>) 
valueList).get(i);
+                    }
+                }
+                return heapIntVector;
+            case FLOAT:
+                HeapFloatVector heapFloatVector = new HeapFloatVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    if (valueList.get(i) == null) {
+                        heapFloatVector.setNullAt(i);
+                    } else {
+                        heapFloatVector.vector[i] = ((List<Float>) 
valueList).get(i);
+                    }
+                }
+                return heapFloatVector;
+            case BIGINT:
+                HeapLongVector heapLongVector = new HeapLongVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    if (valueList.get(i) == null) {
+                        heapLongVector.setNullAt(i);
+                    } else {
+                        heapLongVector.vector[i] = ((List<Long>) 
valueList).get(i);
+                    }
+                }
+                return heapLongVector;
+            case DOUBLE:
+                HeapDoubleVector heapDoubleVector = new 
HeapDoubleVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    if (valueList.get(i) == null) {
+                        heapDoubleVector.setNullAt(i);
+                    } else {
+                        heapDoubleVector.vector[i] = ((List<Double>) 
valueList).get(i);
+                    }
+                }
+                return heapDoubleVector;
+            case TIMESTAMP_WITHOUT_TIME_ZONE:
+            case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+                HeapTimestampVector heapTimestampVector = new 
HeapTimestampVector(total);
+                for (int i = 0; i < valueList.size(); i++) {
+                    if (valueList.get(i) == null) {
+                        heapTimestampVector.setNullAt(i);
+                    } else {
+                        heapTimestampVector.setTimestamp(i, ((List<Timestamp>) 
valueList).get(i));
+                    }
+                }
+                return heapTimestampVector;
+            case DECIMAL:
+                PrimitiveType.PrimitiveTypeName primitiveTypeName =
+                        descriptor.getPrimitiveType().getPrimitiveTypeName();
+                switch (primitiveTypeName) {
+                    case INT32:
+                        HeapIntVector phiv = new HeapIntVector(total);
+                        for (int i = 0; i < valueList.size(); i++) {
+                            if (valueList.get(i) == null) {
+                                phiv.setNullAt(i);
+                            } else {
+                                phiv.vector[i] = ((List<Integer>) 
valueList).get(i);
+                            }
+                        }
+                        return new ParquetDecimalVector(phiv);
+                    case INT64:
+                        HeapLongVector phlv = new HeapLongVector(total);
+                        for (int i = 0; i < valueList.size(); i++) {
+                            if (valueList.get(i) == null) {
+                                phlv.setNullAt(i);
+                            } else {
+                                phlv.vector[i] = ((List<Long>) 
valueList).get(i);
+                            }
+                        }
+                        return new ParquetDecimalVector(phlv);
+                    default:
+                        HeapBytesVector phbv = getHeapBytesVector(total, 
valueList);
+                        return new ParquetDecimalVector(phbv);
+                }
+            default:
+                throw new RuntimeException("Unsupported type in the list: " + 
type);
+        }
+    }
+
+    private static HeapBytesVector getHeapBytesVector(int total, List 
valueList) {
+        HeapBytesVector phbv = new HeapBytesVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+            byte[] src = ((List<byte[]>) valueList).get(i);
+            if (valueList.get(i) == null) {
+                phbv.setNullAt(i);
+            } else {
+                phbv.appendBytes(i, src, 0, src.length);
+            }
+        }
+        return phbv;
+    }
+
+    protected void readPage() {
+        DataPage page = pageReader.readPage();
+
+        if (page == null) {
+            return;
+        }
+
+        page.accept(
+                new DataPage.Visitor<Void>() {
+                    @Override
+                    public Void visit(DataPageV1 dataPageV1) {
+                        readPageV1(dataPageV1);
+                        return null;
+                    }
+
+                    @Override
+                    public Void visit(DataPageV2 dataPageV2) {
+                        readPageV2(dataPageV2);
+                        return null;
+                    }
+                });
+    }
+
+    private void initDataReader(Encoding dataEncoding, ByteBufferInputStream 
in, int valueCount)
+            throws IOException {
+        this.pageValueCount = valueCount;
+        this.endOfPageValueCount = valuesRead + pageValueCount;
+        if (dataEncoding.usesDictionary()) {
+            this.dataColumn = null;
+            if (dictionary == null) {
+                throw new IOException(
+                        String.format(
+                                "Could not read page in col %s because the 
dictionary was missing for encoding %s.",
+                                descriptor, dataEncoding));
+            }
+            dataColumn =
+                    ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+                            type.asPrimitiveType(),
+                            dataEncoding.getDictionaryBasedValuesReader(
+                                    descriptor, VALUES, 
dictionary.getDictionary()),
+                            isUtcTimestamp);
+            this.isCurrentPageDictionaryEncoded = true;
+        } else {
+            dataColumn =
+                    ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+                            type.asPrimitiveType(),
+                            dataEncoding.getValuesReader(descriptor, VALUES),
+                            isUtcTimestamp);
+            this.isCurrentPageDictionaryEncoded = false;
+        }
+
+        try {
+            dataColumn.initFromPage(pageValueCount, in);
+        } catch (IOException e) {
+            throw new IOException(String.format("Could not read page in col 
%s.", descriptor), e);
+        }
+    }
+
+    private void readPageV1(DataPageV1 page) {
+        ValuesReader rlReader = 
page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
+        ValuesReader dlReader = 
page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+        this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+        this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+        try {
+            BytesInput bytes = page.getBytes();
+            LOG.debug("Page size {}  bytes and {} records.", bytes.size(), 
pageValueCount);
+            ByteBufferInputStream in = bytes.toInputStream();
+            LOG.debug("Reading repetition levels at {}.", in.position());
+            rlReader.initFromPage(pageValueCount, in);
+            LOG.debug("Reading definition levels at {}.", in.position());
+            dlReader.initFromPage(pageValueCount, in);
+            LOG.debug("Reading data at {}.", in.position());
+            initDataReader(page.getValueEncoding(), in, page.getValueCount());
+        } catch (IOException e) {
+            throw new ParquetDecodingException(
+                    String.format("Could not read page %s in col %s.", page, 
descriptor), e);
+        }
+    }
+
+    private void readPageV2(DataPageV2 page) {
+        this.pageValueCount = page.getValueCount();
+        this.repetitionLevelColumn =
+                newRLEIterator(descriptor.getMaxRepetitionLevel(), 
page.getRepetitionLevels());
+        this.definitionLevelColumn =
+                newRLEIterator(descriptor.getMaxDefinitionLevel(), 
page.getDefinitionLevels());
+        try {
+            LOG.debug(
+                    "Page data size {} bytes and {} records.",
+                    page.getData().size(),
+                    pageValueCount);
+            initDataReader(
+                    page.getDataEncoding(), page.getData().toInputStream(), 
page.getValueCount());
+        } catch (IOException e) {
+            throw new ParquetDecodingException(
+                    String.format("Could not read page %s in col %s.", page, 
descriptor), e);
+        }
+    }
+
+    private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+        try {
+            if (maxLevel == 0) {
+                return new NullIntIterator();
+            }
+            return new RLEIntIterator(
+                    new RunLengthBitPackingHybridDecoder(
+                            BytesUtils.getWidthFromMaxInt(maxLevel),
+                            new ByteArrayInputStream(bytes.toByteArray())));
+        } catch (IOException e) {
+            throw new ParquetDecodingException(
+                    String.format("Could not read levels in page for col %s.", 
descriptor), e);
+        }
+    }
+
+    /** Utility interface to abstract over different way to read ints with 
different encodings. */
+    interface IntIterator {
+        int nextInt();
+    }
+
+    /** Reading int from {@link ValuesReader}. */
+    protected static final class ValuesReaderIntIterator implements 
IntIterator {
+        ValuesReader delegate;
+
+        public ValuesReaderIntIterator(ValuesReader delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public int nextInt() {
+            return delegate.readInteger();
+        }
+    }
+
+    /** Reading int from {@link RunLengthBitPackingHybridDecoder}. */
+    protected static final class RLEIntIterator implements IntIterator {
+        RunLengthBitPackingHybridDecoder delegate;
+
+        public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public int nextInt() {
+            try {
+                return delegate.readInt();
+            } catch (IOException e) {
+                throw new ParquetDecodingException(e);
+            }
+        }
+    }
+
+    /** Reading zero always. */
+    protected static final class NullIntIterator implements IntIterator {
+        @Override
+        public int nextInt() {
+            return 0;
+        }
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
index d1ab8d660..28d308bac 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetDecimalVector.java
@@ -22,8 +22,13 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.columnar.BytesColumnVector;
 import org.apache.paimon.data.columnar.ColumnVector;
 import org.apache.paimon.data.columnar.DecimalColumnVector;
+import org.apache.paimon.data.columnar.Dictionary;
 import org.apache.paimon.data.columnar.IntColumnVector;
 import org.apache.paimon.data.columnar.LongColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableBytesVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+import org.apache.paimon.data.columnar.writable.WritableIntVector;
+import org.apache.paimon.data.columnar.writable.WritableLongVector;
 import org.apache.paimon.format.parquet.ParquetSchemaConverter;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
@@ -32,7 +37,8 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
  * Parquet write decimal as int32 and int64 and binary, this class wrap the 
real vector to provide
  * {@link DecimalColumnVector} interface.
  */
-public class ParquetDecimalVector implements DecimalColumnVector {
+public class ParquetDecimalVector
+        implements DecimalColumnVector, WritableLongVector, WritableIntVector, 
WritableBytesVector {
 
     private final ColumnVector vector;
 
@@ -66,4 +72,157 @@ public class ParquetDecimalVector implements 
DecimalColumnVector {
     public boolean isNullAt(int i) {
         return vector.isNullAt(i);
     }
+
+    @Override
+    public void reset() {
+        if (vector instanceof WritableColumnVector) {
+            ((WritableColumnVector) vector).reset();
+        }
+    }
+
+    @Override
+    public void setNullAt(int rowId) {
+        if (vector instanceof WritableColumnVector) {
+            ((WritableColumnVector) vector).setNullAt(rowId);
+        }
+    }
+
+    @Override
+    public void setNulls(int rowId, int count) {
+        if (vector instanceof WritableColumnVector) {
+            ((WritableColumnVector) vector).setNulls(rowId, count);
+        }
+    }
+
+    @Override
+    public void fillWithNulls() {
+        if (vector instanceof WritableColumnVector) {
+            ((WritableColumnVector) vector).fillWithNulls();
+        }
+    }
+
+    @Override
+    public void setDictionary(Dictionary dictionary) {
+        if (vector instanceof WritableColumnVector) {
+            ((WritableColumnVector) vector).setDictionary(dictionary);
+        }
+    }
+
+    @Override
+    public boolean hasDictionary() {
+        if (vector instanceof WritableColumnVector) {
+            return ((WritableColumnVector) vector).hasDictionary();
+        }
+        return false;
+    }
+
+    @Override
+    public WritableIntVector reserveDictionaryIds(int capacity) {
+        if (vector instanceof WritableColumnVector) {
+            return ((WritableColumnVector) 
vector).reserveDictionaryIds(capacity);
+        }
+        throw new RuntimeException("Child vector must be instance of 
WritableColumnVector");
+    }
+
+    @Override
+    public WritableIntVector getDictionaryIds() {
+        if (vector instanceof WritableColumnVector) {
+            return ((WritableColumnVector) vector).getDictionaryIds();
+        }
+        throw new RuntimeException("Child vector must be instance of 
WritableColumnVector");
+    }
+
+    @Override
+    public Bytes getBytes(int i) {
+        if (vector instanceof WritableBytesVector) {
+            return ((WritableBytesVector) vector).getBytes(i);
+        }
+        throw new RuntimeException("Child vector must be instance of 
WritableColumnVector");
+    }
+
+    @Override
+    public void appendBytes(int rowId, byte[] value, int offset, int length) {
+        if (vector instanceof WritableBytesVector) {
+            ((WritableBytesVector) vector).appendBytes(rowId, value, offset, 
length);
+        }
+    }
+
+    @Override
+    public void fill(byte[] value) {
+        if (vector instanceof WritableBytesVector) {
+            ((WritableBytesVector) vector).fill(value);
+        }
+    }
+
+    @Override
+    public int getInt(int i) {
+        if (vector instanceof WritableIntVector) {
+            return ((WritableIntVector) vector).getInt(i);
+        }
+        throw new RuntimeException("Child vector must be instance of 
WritableColumnVector");
+    }
+
+    @Override
+    public void setInt(int rowId, int value) {
+        if (vector instanceof WritableIntVector) {
+            ((WritableIntVector) vector).setInt(rowId, value);
+        }
+    }
+
+    @Override
+    public void setIntsFromBinary(int rowId, int count, byte[] src, int 
srcIndex) {
+        if (vector instanceof WritableIntVector) {
+            ((WritableIntVector) vector).setIntsFromBinary(rowId, count, src, 
srcIndex);
+        }
+    }
+
+    @Override
+    public void setInts(int rowId, int count, int value) {
+        if (vector instanceof WritableIntVector) {
+            ((WritableIntVector) vector).setInts(rowId, count, value);
+        }
+    }
+
+    @Override
+    public void setInts(int rowId, int count, int[] src, int srcIndex) {
+        if (vector instanceof WritableIntVector) {
+            ((WritableIntVector) vector).setInts(rowId, count, src, srcIndex);
+        }
+    }
+
+    @Override
+    public void fill(int value) {
+        if (vector instanceof WritableIntVector) {
+            ((WritableIntVector) vector).fill(value);
+        }
+    }
+
+    @Override
+    public long getLong(int i) {
+        if (vector instanceof WritableLongVector) {
+            return ((WritableLongVector) vector).getLong(i);
+        }
+        throw new RuntimeException("Child vector must be instance of 
WritableColumnVector");
+    }
+
+    @Override
+    public void setLong(int rowId, long value) {
+        if (vector instanceof WritableLongVector) {
+            ((WritableLongVector) vector).setLong(rowId, value);
+        }
+    }
+
+    @Override
+    public void setLongsFromBinary(int rowId, int count, byte[] src, int 
srcIndex) {
+        if (vector instanceof WritableLongVector) {
+            ((WritableLongVector) vector).setLongsFromBinary(rowId, count, 
src, srcIndex);
+        }
+    }
+
+    @Override
+    public void fill(long value) {
+        if (vector instanceof WritableLongVector) {
+            ((WritableLongVector) vector).fill(value);
+        }
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
index 59af1f391..8a362961d 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/ParquetSplitReaderUtil.java
@@ -32,7 +32,11 @@ import org.apache.paimon.data.columnar.heap.HeapShortVector;
 import org.apache.paimon.data.columnar.heap.HeapTimestampVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.format.parquet.ParquetSchemaConverter;
+import org.apache.paimon.format.parquet.type.ParquetField;
+import org.apache.paimon.format.parquet.type.ParquetGroupField;
+import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
 import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypeChecks;
 import org.apache.paimon.types.DecimalType;
@@ -40,12 +44,20 @@ import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.MultisetType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
 
 import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.io.ColumnIO;
+import org.apache.parquet.io.GroupColumnIO;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.PrimitiveColumnIO;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
@@ -53,8 +65,11 @@ import org.apache.parquet.schema.Type;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Objects;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 
 /** Util for generating {@link ColumnReader}. */
 public class ParquetSplitReaderUtil {
@@ -65,6 +80,7 @@ public class ParquetSplitReaderUtil {
             Type type,
             List<ColumnDescriptor> columnDescriptors,
             PageReadStore pages,
+            ParquetField field,
             int depth)
             throws IOException {
         List<ColumnDescriptor> descriptors =
@@ -126,60 +142,10 @@ public class ParquetSplitReaderUtil {
                                 ((DecimalType) fieldType).getPrecision());
                 }
             case ARRAY:
-                return new ArrayColumnReader(
-                        descriptors.get(0),
-                        pages.getPageReader(descriptors.get(0)),
-                        true,
-                        descriptors.get(0).getPrimitiveType(),
-                        fieldType);
             case MAP:
-                MapType mapType = (MapType) fieldType;
-                ArrayColumnReader mapKeyReader =
-                        new ArrayColumnReader(
-                                descriptors.get(0),
-                                pages.getPageReader(descriptors.get(0)),
-                                true,
-                                descriptors.get(0).getPrimitiveType(),
-                                new ArrayType(mapType.getKeyType()));
-                ArrayColumnReader mapValueReader =
-                        new ArrayColumnReader(
-                                descriptors.get(1),
-                                pages.getPageReader(descriptors.get(1)),
-                                true,
-                                descriptors.get(1).getPrimitiveType(),
-                                new ArrayType(mapType.getValueType()));
-                return new MapColumnReader(mapKeyReader, mapValueReader);
             case MULTISET:
-                MultisetType multisetType = (MultisetType) fieldType;
-                ArrayColumnReader multisetKeyReader =
-                        new ArrayColumnReader(
-                                descriptors.get(0),
-                                pages.getPageReader(descriptors.get(0)),
-                                true,
-                                descriptors.get(0).getPrimitiveType(),
-                                new ArrayType(multisetType.getElementType()));
-                ArrayColumnReader multisetValueReader =
-                        new ArrayColumnReader(
-                                descriptors.get(1),
-                                pages.getPageReader(descriptors.get(1)),
-                                true,
-                                descriptors.get(1).getPrimitiveType(),
-                                new ArrayType(new IntType(false)));
-                return new MapColumnReader(multisetKeyReader, 
multisetValueReader);
             case ROW:
-                RowType rowType = (RowType) fieldType;
-                GroupType groupType = type.asGroupType();
-                List<ColumnReader> fieldReaders = new ArrayList<>();
-                for (int i = 0; i < rowType.getFieldCount(); i++) {
-                    fieldReaders.add(
-                            createColumnReader(
-                                    rowType.getTypeAt(i),
-                                    groupType.getType(i),
-                                    descriptors,
-                                    pages,
-                                    depth + 1));
-                }
-                return new RowColumnReader(fieldReaders);
+                return new NestedColumnReader(true, pages, field);
             default:
                 throw new UnsupportedOperationException(fieldType + " is not 
supported now.");
         }
@@ -300,7 +266,18 @@ public class ParquetSplitReaderUtil {
                                 depth));
             case MAP:
                 MapType mapType = (MapType) fieldType;
+                LogicalTypeAnnotation mapTypeAnnotation = 
type.getLogicalTypeAnnotation();
                 GroupType mapRepeatedType = 
type.asGroupType().getType(0).asGroupType();
+                if 
(mapTypeAnnotation.equals(LogicalTypeAnnotation.listType())) {
+                    mapRepeatedType = mapRepeatedType.getType(0).asGroupType();
+                    depth++;
+                    if (mapRepeatedType
+                            .getLogicalTypeAnnotation()
+                            .equals(LogicalTypeAnnotation.mapType())) {
+                        mapRepeatedType = 
mapRepeatedType.getType(0).asGroupType();
+                        depth++;
+                    }
+                }
                 return new HeapMapVector(
                         batchSize,
                         createWritableColumnVector(
@@ -317,7 +294,18 @@ public class ParquetSplitReaderUtil {
                                 depth + 2));
             case MULTISET:
                 MultisetType multisetType = (MultisetType) fieldType;
+                LogicalTypeAnnotation multisetTypeAnnotation = 
type.getLogicalTypeAnnotation();
                 GroupType multisetRepeatedType = 
type.asGroupType().getType(0).asGroupType();
+                if 
(multisetTypeAnnotation.equals(LogicalTypeAnnotation.listType())) {
+                    multisetRepeatedType = 
multisetRepeatedType.getType(0).asGroupType();
+                    depth++;
+                    if (multisetRepeatedType
+                            .getLogicalTypeAnnotation()
+                            .equals(LogicalTypeAnnotation.mapType())) {
+                        multisetRepeatedType = 
multisetRepeatedType.getType(0).asGroupType();
+                        depth++;
+                    }
+                }
                 return new HeapMapVector(
                         batchSize,
                         createWritableColumnVector(
@@ -335,6 +323,12 @@ public class ParquetSplitReaderUtil {
             case ROW:
                 RowType rowType = (RowType) fieldType;
                 GroupType groupType = type.asGroupType();
+                if 
(LogicalTypeAnnotation.listType().equals(groupType.getLogicalTypeAnnotation())) 
{
+                    // this means there was two outside struct, need to get 
group twice.
+                    groupType = groupType.getType(0).asGroupType();
+                    groupType = groupType.getType(0).asGroupType();
+                    depth = depth + 2;
+                }
                 WritableColumnVector[] columnVectors =
                         new WritableColumnVector[rowType.getFieldCount()];
                 for (int i = 0; i < columnVectors.length; i++) {
@@ -371,4 +365,171 @@ public class ParquetSplitReaderUtil {
         }
         return res;
     }
+
+    public static List<ParquetField> buildFieldsList(
+            List<DataField> childrens, List<String> fieldNames, 
MessageColumnIO columnIO) {
+        List<ParquetField> list = new ArrayList<>();
+        for (int i = 0; i < childrens.size(); i++) {
+            list.add(
+                    constructField(
+                            childrens.get(i), lookupColumnByName(columnIO, 
fieldNames.get(i))));
+        }
+        return list;
+    }
+
+    private static ParquetField constructField(DataField dataField, ColumnIO 
columnIO) {
+        boolean required = columnIO.getType().getRepetition() == REQUIRED;
+        int repetitionLevel = columnIO.getRepetitionLevel();
+        int definitionLevel = columnIO.getDefinitionLevel();
+        DataType type = dataField.type();
+        String filedName = dataField.name();
+        if (type instanceof RowType) {
+            GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+            RowType rowType = (RowType) type;
+            ImmutableList.Builder<ParquetField> fieldsBuilder = 
ImmutableList.builder();
+            List<String> fieldNames = rowType.getFieldNames();
+            List<DataField> childrens = rowType.getFields();
+            for (int i = 0; i < childrens.size(); i++) {
+                fieldsBuilder.add(
+                        constructField(
+                                childrens.get(i),
+                                lookupColumnByName(groupColumnIO, 
fieldNames.get(i))));
+            }
+
+            return new ParquetGroupField(
+                    type, repetitionLevel, definitionLevel, required, 
fieldsBuilder.build());
+        }
+
+        if (type instanceof MapType) {
+            GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+            GroupColumnIO keyValueColumnIO = 
getMapKeyValueColumn(groupColumnIO);
+            MapType mapType = (MapType) type;
+            ParquetField keyField =
+                    constructField(
+                            new DataField(0, "", mapType.getKeyType()),
+                            keyValueColumnIO.getChild(0));
+            ParquetField valueField =
+                    constructField(
+                            new DataField(0, "", mapType.getValueType()),
+                            keyValueColumnIO.getChild(1));
+            return new ParquetGroupField(
+                    type,
+                    repetitionLevel,
+                    definitionLevel,
+                    required,
+                    ImmutableList.of(keyField, valueField));
+        }
+
+        if (type instanceof MultisetType) {
+            GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+            GroupColumnIO keyValueColumnIO = 
getMapKeyValueColumn(groupColumnIO);
+            MultisetType multisetType = (MultisetType) type;
+            ParquetField keyField =
+                    constructField(
+                            new DataField(0, "", 
multisetType.getElementType()),
+                            keyValueColumnIO.getChild(0));
+            ParquetField valueField =
+                    constructField(
+                            new DataField(0, "", new IntType()), 
keyValueColumnIO.getChild(1));
+            return new ParquetGroupField(
+                    type,
+                    repetitionLevel,
+                    definitionLevel,
+                    required,
+                    ImmutableList.of(keyField, valueField));
+        }
+
+        if (type instanceof ArrayType) {
+            ArrayType arrayType = (ArrayType) type;
+            ColumnIO elementTypeColumnIO;
+            if (columnIO instanceof GroupColumnIO) {
+                GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+                if (!StringUtils.isNullOrWhitespaceOnly(filedName)) {
+                    while (!Objects.equals(groupColumnIO.getName(), 
filedName)) {
+                        groupColumnIO = (GroupColumnIO) 
groupColumnIO.getChild(0);
+                    }
+                    elementTypeColumnIO = groupColumnIO;
+                } else {
+                    if (arrayType.getElementType() instanceof RowType) {
+                        elementTypeColumnIO = groupColumnIO;
+                    } else {
+                        elementTypeColumnIO = groupColumnIO.getChild(0);
+                    }
+                }
+            } else if (columnIO instanceof PrimitiveColumnIO) {
+                elementTypeColumnIO = columnIO;
+            } else {
+                throw new RuntimeException(String.format("Unknown ColumnIO, 
%s", columnIO));
+            }
+
+            ParquetField field =
+                    constructField(
+                            new DataField(0, "", arrayType.getElementType()),
+                            getArrayElementColumn(elementTypeColumnIO));
+            if (repetitionLevel == field.getRepetitionLevel()) {
+                repetitionLevel = columnIO.getParent().getRepetitionLevel();
+            }
+            return new ParquetGroupField(
+                    type, repetitionLevel, definitionLevel, required, 
ImmutableList.of(field));
+        }
+
+        PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO;
+        return new ParquetPrimitiveField(
+                type, required, primitiveColumnIO.getColumnDescriptor(), 
primitiveColumnIO.getId());
+    }
+
+    /**
+     * Parquet's column names are case in sensitive. So when we look up 
columns we first check for
+     * exact match, and if that can not find we look for a case-insensitive 
match.
+     */
+    public static ColumnIO lookupColumnByName(GroupColumnIO groupColumnIO, 
String columnName) {
+        ColumnIO columnIO = groupColumnIO.getChild(columnName);
+
+        if (columnIO != null) {
+            return columnIO;
+        }
+
+        for (int i = 0; i < groupColumnIO.getChildrenCount(); i++) {
+            if 
(groupColumnIO.getChild(i).getName().equalsIgnoreCase(columnName)) {
+                return groupColumnIO.getChild(i);
+            }
+        }
+
+        throw new RuntimeException("Can not find column io for parquet 
reader.");
+    }
+
+    public static GroupColumnIO getMapKeyValueColumn(GroupColumnIO 
groupColumnIO) {
+        while (groupColumnIO.getChildrenCount() == 1) {
+            groupColumnIO = (GroupColumnIO) groupColumnIO.getChild(0);
+        }
+        return groupColumnIO;
+    }
+
+    public static ColumnIO getArrayElementColumn(ColumnIO columnIO) {
+        while (columnIO instanceof GroupColumnIO && 
!columnIO.getType().isRepetition(REPEATED)) {
+            columnIO = ((GroupColumnIO) columnIO).getChild(0);
+        }
+
+        /* Compatible with array has a standard 3-level structure:
+         *  optional group my_list (LIST) {
+         *     repeated group element {
+         *        required binary str (UTF8);
+         *     };
+         *  }
+         */
+        if (columnIO instanceof GroupColumnIO
+                && columnIO.getType().getLogicalTypeAnnotation() == null
+                && ((GroupColumnIO) columnIO).getChildrenCount() == 1
+                && !columnIO.getName().equals("array")
+                && !columnIO.getName().equals(columnIO.getParent().getName() + 
"_tuple")) {
+            return ((GroupColumnIO) columnIO).getChild(0);
+        }
+
+        /* Compatible with array for 2-level arrays where a repeated field is 
not a group:
+         *   optional group my_list (LIST) {
+         *      repeated int32 element;
+         *   }
+         */
+        return columnIO;
+    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
new file mode 100644
index 000000000..94fe6b91d
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetField.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.type;
+
+import org.apache.paimon.types.DataType;
+
+/** Field that represent parquet's field type. */
+public abstract class ParquetField {
+    private final DataType type;
+    private final int repetitionLevel;
+    private final int definitionLevel;
+    private final boolean required;
+
+    public ParquetField(DataType type, int repetitionLevel, int 
definitionLevel, boolean required) {
+        this.type = type;
+        this.repetitionLevel = repetitionLevel;
+        this.definitionLevel = definitionLevel;
+        this.required = required;
+    }
+
+    public DataType getType() {
+        return type;
+    }
+
+    public int getRepetitionLevel() {
+        return repetitionLevel;
+    }
+
+    public int getDefinitionLevel() {
+        return definitionLevel;
+    }
+
+    public boolean isRequired() {
+        return required;
+    }
+
+    @Override
+    public String toString() {
+        return "Field{"
+                + "type="
+                + type
+                + ", repetitionLevel="
+                + repetitionLevel
+                + ", definitionLevel="
+                + definitionLevel
+                + ", required="
+                + required
+                + '}';
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
new file mode 100644
index 000000000..95f0dd1f8
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetGroupField.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.type;
+
+import org.apache.paimon.types.DataType;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+import static java.util.Objects.requireNonNull;
+
+/** Field that represent parquet's Group Field. */
+public class ParquetGroupField extends ParquetField {
+
+    private final List<ParquetField> children;
+
+    public ParquetGroupField(
+            DataType type,
+            int repetitionLevel,
+            int definitionLevel,
+            boolean required,
+            List<ParquetField> children) {
+        super(type, repetitionLevel, definitionLevel, required);
+        this.children = ImmutableList.copyOf(requireNonNull(children, 
"children is null"));
+    }
+
+    public List<ParquetField> getChildren() {
+        return children;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetPrimitiveField.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetPrimitiveField.java
new file mode 100644
index 000000000..148596d15
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/type/ParquetPrimitiveField.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.type;
+
+import org.apache.paimon.types.DataType;
+
+import org.apache.parquet.column.ColumnDescriptor;
+
+import static java.util.Objects.requireNonNull;
+
+/** Field that represent parquet's primitive field. */
+public class ParquetPrimitiveField extends ParquetField {
+
+    private final ColumnDescriptor descriptor;
+    private final int id;
+
+    public ParquetPrimitiveField(
+            DataType type, boolean required, ColumnDescriptor descriptor, int 
id) {
+        super(
+                type,
+                descriptor.getMaxRepetitionLevel(),
+                descriptor.getMaxDefinitionLevel(),
+                required);
+        this.descriptor = requireNonNull(descriptor, "descriptor is required");
+        this.id = id;
+    }
+
+    public ColumnDescriptor getDescriptor() {
+        return descriptor;
+    }
+
+    public int getId() {
+        return id;
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index 00a7a0081..56aa83875 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.Decimal;
 import org.apache.paimon.data.GenericArray;
 import org.apache.paimon.data.GenericMap;
 import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.format.FormatReaderContext;
@@ -50,11 +51,21 @@ import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.types.TinyIntType;
 import org.apache.paimon.types.VarCharType;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
 import org.apache.parquet.filter2.compat.FilterCompat;
 import org.apache.parquet.filter2.predicate.ParquetFilters;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.schema.MessageType;
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.RepeatedTest;
 import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.File;
@@ -127,6 +138,31 @@ public class ParquetReadWriteTest {
                                     .build())
                     .build();
 
+    private static final RowType NESTED_ARRAY_MAP_TYPE =
+            RowType.of(
+                    new IntType(),
+                    new ArrayType(true, new IntType()),
+                    new ArrayType(true, new ArrayType(true, new IntType())),
+                    new ArrayType(
+                            true,
+                            new MapType(
+                                    true,
+                                    new VarCharType(VarCharType.MAX_LENGTH),
+                                    new VarCharType(VarCharType.MAX_LENGTH))),
+                    new ArrayType(true, RowType.builder().field("a", new 
IntType()).build()),
+                    RowType.of(
+                            new ArrayType(
+                                    true,
+                                    RowType.builder()
+                                            .field(
+                                                    "b",
+                                                    new ArrayType(
+                                                            true,
+                                                            new 
ArrayType(true, new IntType())))
+                                            .field("c", new IntType())
+                                            .build()),
+                            new IntType()));
+
     @TempDir public File folder;
 
     public static Collection<Integer> parameters() {
@@ -226,7 +262,7 @@ public class ParquetReadWriteTest {
             records.add(newRow(v));
         }
 
-        Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+        Path testPath = createTempParquetFileByPaimon(folder, records, 
rowGroupSize, ROW_TYPE);
         // test reader
         DataType[] fieldTypes = new DataType[] {new DoubleType(), new 
TinyIntType(), new IntType()};
         ParquetReaderFactory format =
@@ -265,7 +301,7 @@ public class ParquetReadWriteTest {
             records.add(newRow(v));
         }
 
-        Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+        Path testPath = createTempParquetFileByPaimon(folder, records, 
rowGroupSize, ROW_TYPE);
 
         // test reader
         DataType[] fieldTypes =
@@ -311,7 +347,7 @@ public class ParquetReadWriteTest {
             records.add(newRow(v));
         }
 
-        Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+        Path testPath = createTempParquetFileByPaimon(folder, records, 
rowGroupSize, ROW_TYPE);
 
         DataType[] fieldTypes = new DataType[] {new DoubleType()};
         ParquetReaderFactory format =
@@ -353,7 +389,7 @@ public class ParquetReadWriteTest {
             records.add(newRow(v));
         }
 
-        Path testPath = createTempParquetFile(folder, records, rowGroupSize);
+        Path testPath = createTempParquetFileByPaimon(folder, records, 
rowGroupSize, ROW_TYPE);
 
         DataType[] fieldTypes = new DataType[] {new IntType()};
         // Build filter: f4 > randomStart
@@ -394,22 +430,47 @@ public class ParquetReadWriteTest {
         }
     }
 
+    @ParameterizedTest
+    @CsvSource({"10, paimon", "1000, paimon", "10, origin", "1000, origin"})
+    public void testNestedRead(int rowGroupSize, String writerType) throws 
Exception {
+        List<InternalRow> rows = prepareNestedData(1283);
+        Path path;
+        if ("paimon".equals(writerType)) {
+            path = createTempParquetFileByPaimon(folder, rows, rowGroupSize, 
NESTED_ARRAY_MAP_TYPE);
+        } else if ("origin".equals(writerType)) {
+            path = createNestedDataByOriginWriter(1283, folder, rowGroupSize);
+        } else {
+            throw new RuntimeException("Unknown writer type.");
+        }
+        ParquetReaderFactory format =
+                new ParquetReaderFactory(
+                        new Options(), NESTED_ARRAY_MAP_TYPE, 500, 
FilterCompat.NOOP);
+        RecordReader<InternalRow> reader =
+                format.createReader(
+                        new FormatReaderContext(
+                                new LocalFileIO(), path, new 
LocalFileIO().getFileSize(path)));
+        List<InternalRow> results = new ArrayList<>(1283);
+        reader.forEachRemaining(results::add);
+        compareNestedRow(rows, results);
+    }
+
     private void innerTestTypes(File folder, List<Integer> records, int 
rowGroupSize)
             throws IOException {
         List<InternalRow> rows = 
records.stream().map(this::newRow).collect(Collectors.toList());
-        Path testPath = createTempParquetFile(folder, rows, rowGroupSize);
+        Path testPath = createTempParquetFileByPaimon(folder, rows, 
rowGroupSize, ROW_TYPE);
         int len = testReadingFile(subList(records, 0), testPath);
         assertThat(len).isEqualTo(records.size());
     }
 
-    private Path createTempParquetFile(File folder, List<InternalRow> rows, 
int rowGroupSize)
+    private Path createTempParquetFileByPaimon(
+            File folder, List<InternalRow> rows, int rowGroupSize, RowType 
rowType)
             throws IOException {
         // write data
         Path path = new Path(folder.getPath(), UUID.randomUUID().toString());
         Options conf = new Options();
         conf.setInteger("parquet.block.size", rowGroupSize);
         ParquetWriterFactory factory =
-                new ParquetWriterFactory(new RowDataParquetBuilder(ROW_TYPE, 
conf));
+                new ParquetWriterFactory(new RowDataParquetBuilder(rowType, 
conf));
         String[] candidates = new String[] {"snappy", "zstd", "gzip"};
         String compress = candidates[new Random().nextInt(3)];
         FormatWriter writer =
@@ -633,4 +694,258 @@ public class ParquetReadWriteTest {
     private static <T> List<T> subList(List<T> list, int i) {
         return list.subList(i, list.size());
     }
+
+    private List<InternalRow> prepareNestedData(int rowNum) {
+        List<InternalRow> rows = new ArrayList<>(rowNum);
+
+        for (int i = 0; i < rowNum; i++) {
+            Integer v = i;
+            Map<BinaryString, BinaryString> mp1 = new HashMap<>();
+            mp1.put(null, BinaryString.fromString("val_" + i));
+            Map<BinaryString, BinaryString> mp2 = new HashMap<>();
+            mp2.put(BinaryString.fromString("key_" + i), null);
+            mp2.put(BinaryString.fromString("key@" + i), 
BinaryString.fromString("val@" + i));
+
+            rows.add(
+                    GenericRow.of(
+                            v,
+                            new GenericArray(new Object[] {v, v + 1}),
+                            new GenericArray(
+                                    new Object[] {
+                                        new GenericArray(new Object[] {i, i + 
1, null}),
+                                        new GenericArray(new Object[] {i, i + 
2, null}),
+                                        new GenericArray(new Object[] {}),
+                                        null
+                                    }),
+                            new GenericArray(
+                                    new GenericMap[] {
+                                        null, new GenericMap(mp1), new 
GenericMap(mp2)
+                                    }),
+                            new GenericArray(
+                                    new GenericRow[] {GenericRow.of(i), 
GenericRow.of(i + 1)}),
+                            GenericRow.of(
+                                    new GenericArray(
+                                            new GenericRow[] {
+                                                GenericRow.of(
+                                                        new GenericArray(
+                                                                new Object[] {
+                                                                    new 
GenericArray(
+                                                                            
new Object[] {
+                                                                               
 i, i + 1, null
+                                                                            }),
+                                                                    new 
GenericArray(
+                                                                            
new Object[] {
+                                                                               
 i, i + 2, null
+                                                                            }),
+                                                                    new 
GenericArray(
+                                                                            
new Object[] {}),
+                                                                    null
+                                                                }),
+                                                        i)
+                                            }),
+                                    i)));
+        }
+        return rows;
+    }
+
+    private Path createNestedDataByOriginWriter(int rowNum, File tmpDir, int 
rowGroupSize) {
+        Path path = new Path(tmpDir.getPath(), UUID.randomUUID().toString());
+        Configuration conf = new Configuration();
+        conf.setInt("parquet.block.size", rowGroupSize);
+        MessageType schema =
+                ParquetSchemaConverter.convertToParquetMessageType(
+                        "paimon-parquet", NESTED_ARRAY_MAP_TYPE);
+        try (ParquetWriter<Group> writer =
+                ExampleParquetWriter.builder(
+                                HadoopOutputFile.fromPath(
+                                        new 
org.apache.hadoop.fs.Path(path.toString()), conf))
+                        .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+                        .withConf(new Configuration())
+                        .withType(schema)
+                        .build()) {
+            SimpleGroupFactory simpleGroupFactory = new 
SimpleGroupFactory(schema);
+            for (int i = 0; i < rowNum; i++) {
+                Group row = simpleGroupFactory.newGroup();
+                // add int
+                row.append("f0", i);
+
+                // add array<int>
+                Group f1 = row.addGroup("f1");
+                createParquetArrayGroup(f1, i, i + 1);
+
+                // add array<array<int>>
+                Group f2 = row.addGroup("f2");
+                createParquetDoubleNestedArray(f2, i);
+
+                // add array<map>
+                Group f3 = row.addGroup("f3");
+                f3.addGroup(0);
+                Group mapList = f3.addGroup(0);
+                Group map1 = mapList.addGroup(0);
+                createParquetMapGroup(map1, null, "val_" + i);
+                Group map2 = mapList.addGroup(0);
+                createParquetMapGroup(map2, "key_" + i, null);
+                createParquetMapGroup(map2, "key@" + i, "val@" + i);
+
+                // add array<row>
+                Group f4 = row.addGroup("f4");
+                Group rowList = f4.addGroup(0);
+                Group row1 = rowList.addGroup(0);
+                row1.add(0, i);
+                Group row2 = rowList.addGroup(0);
+                row2.add(0, i + 1);
+                f4.addGroup(0);
+
+                // add ROW<`f0` ARRAY<ROW<`b` ARRAY<ARRAY<INT>>, `c` INT>>, 
`f1` INT>>
+                Group f5 = row.addGroup("f5");
+                Group arrayRow = f5.addGroup(0);
+                Group insideRow = arrayRow.addGroup(0).addGroup(0);
+                Group insideArray = insideRow.addGroup(0);
+                createParquetDoubleNestedArray(insideArray, i);
+                insideRow.add(1, i);
+                arrayRow.addGroup(0);
+                f5.add(1, i);
+                writer.write(row);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException("Create nested data by parquet origin 
writer failed.");
+        }
+        return path;
+    }
+
+    private void createParquetDoubleNestedArray(Group group, int i) {
+        Group outside = group.addGroup(0);
+        Group inside = outside.addGroup(0);
+        createParquetArrayGroup(inside, i, i + 1);
+        Group inside2 = outside.addGroup(0);
+        createParquetArrayGroup(inside2, i, i + 2);
+        // create empty array []
+        outside.addGroup(0);
+        // create null
+        group.addGroup(0);
+    }
+
+    private void createParquetArrayGroup(Group group, int i, int j) {
+        Group element = group.addGroup(0);
+        element.add(0, i);
+        element = group.addGroup(0);
+        element.add(0, j);
+        group.addGroup(0);
+    }
+
+    private void createParquetMapGroup(Group map, String key, String value) {
+        Group entry = map.addGroup(0);
+        if (key != null) {
+            entry.append("key", key);
+        }
+        if (value != null) {
+            entry.append("value", value);
+        }
+    }
+
+    private void compareNestedRow(List<InternalRow> rows, List<InternalRow> 
results) {
+        Assertions.assertEquals(rows.size(), results.size());
+
+        for (InternalRow result : results) {
+            int index = result.getInt(0);
+            InternalRow origin = rows.get(index);
+            Assertions.assertEquals(origin.getInt(0), result.getInt(0));
+
+            // int[]
+            Assertions.assertEquals(origin.getArray(1).getInt(0), 
result.getArray(1).getInt(0));
+            Assertions.assertEquals(origin.getArray(1).getInt(1), 
result.getArray(1).getInt(1));
+
+            // int[][]
+            Assertions.assertEquals(
+                    origin.getArray(2).getArray(0).getInt(0),
+                    result.getArray(2).getArray(0).getInt(0));
+            Assertions.assertEquals(
+                    origin.getArray(2).getArray(0).getInt(1),
+                    result.getArray(2).getArray(0).getInt(1));
+            Assertions.assertTrue(result.getArray(2).getArray(0).isNullAt(2));
+
+            Assertions.assertEquals(
+                    origin.getArray(2).getArray(1).getInt(0),
+                    result.getArray(2).getArray(1).getInt(0));
+            Assertions.assertEquals(
+                    origin.getArray(2).getArray(1).getInt(1),
+                    result.getArray(2).getArray(1).getInt(1));
+            Assertions.assertTrue(result.getArray(2).getArray(1).isNullAt(2));
+
+            Assertions.assertEquals(0, result.getArray(2).getArray(2).size());
+            Assertions.assertTrue(result.getArray(2).isNullAt(3));
+
+            // map[]
+            Assertions.assertTrue(result.getArray(3).isNullAt(0));
+            
Assertions.assertTrue(result.getArray(3).getMap(1).keyArray().isNullAt(0));
+
+            Assertions.assertEquals(
+                    origin.getArray(3).getMap(1).valueArray().getString(0),
+                    result.getArray(3).getMap(1).valueArray().getString(0));
+
+            Map<String, String> originMap = new HashMap<>();
+            Map<String, String> resultMap = new HashMap<>();
+            fillWithMap(originMap, origin.getArray(3).getMap(2), 0);
+            fillWithMap(originMap, origin.getArray(3).getMap(2), 1);
+            fillWithMap(resultMap, result.getArray(3).getMap(2), 0);
+            fillWithMap(resultMap, result.getArray(3).getMap(2), 1);
+            Assertions.assertEquals(originMap, resultMap);
+
+            // row<int>[]
+            Assertions.assertEquals(
+                    origin.getArray(4).getRow(0, 1).getInt(0),
+                    result.getArray(4).getRow(0, 1).getInt(0));
+            Assertions.assertEquals(
+                    origin.getArray(4).getRow(1, 1).getInt(0),
+                    result.getArray(4).getRow(1, 1).getInt(0));
+
+            Assertions.assertEquals(
+                    origin.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(0).getInt(0),
+                    result.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(0).getInt(0));
+            Assertions.assertEquals(
+                    origin.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(0).getInt(1),
+                    result.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(0).getInt(1));
+            Assertions.assertTrue(
+                    result.getRow(5, 2)
+                            .getArray(0)
+                            .getRow(0, 2)
+                            .getArray(0)
+                            .getArray(0)
+                            .isNullAt(2));
+
+            Assertions.assertEquals(
+                    origin.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(1).getInt(0),
+                    result.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(1).getInt(0));
+            Assertions.assertEquals(
+                    origin.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(1).getInt(1),
+                    result.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(1).getInt(1));
+            Assertions.assertTrue(
+                    result.getRow(5, 2)
+                            .getArray(0)
+                            .getRow(0, 2)
+                            .getArray(0)
+                            .getArray(1)
+                            .isNullAt(2));
+
+            Assertions.assertEquals(
+                    0, result.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).getArray(2).size());
+            Assertions.assertTrue(
+                    result.getRow(5, 2).getArray(0).getRow(0, 
2).getArray(0).isNullAt(3));
+
+            Assertions.assertEquals(
+                    origin.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1),
+                    result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1));
+            Assertions.assertEquals(origin.getRow(5, 2).getInt(1), 
result.getRow(5, 2).getInt(1));
+        }
+    }
+
+    private void fillWithMap(Map<String, String> map, InternalMap internalMap, 
int index) {
+        map.put(
+                internalMap.keyArray().isNullAt(index)
+                        ? null
+                        : internalMap.keyArray().getString(index).toString(),
+                internalMap.valueArray().isNullAt(index)
+                        ? null
+                        : 
internalMap.valueArray().getString(index).toString());
+    }
 }

Reply via email to