This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 47bf4e41342e feat(flink): Wire Flink 2.1 nested Parquet readers into 
the Hudi read path (FLINK-35702) (#18700)
47bf4e41342e is described below

commit 47bf4e41342e4f1dab26a7fb1489f278fbd1226c
Author: Shihuan Liu <[email protected]>
AuthorDate: Thu May 7 20:07:05 2026 -0700

    feat(flink): Wire Flink 2.1 nested Parquet readers into the Hudi read path 
(FLINK-35702) (#18700)
---
 .../table/format/cow/ParquetSplitReaderUtil.java   | 686 ++++++++++++---------
 .../format/cow/vector/HeapMapColumnVector.java     |  19 +-
 .../cow/vector/reader/NestedColumnReader.java      |  22 +
 .../reader/ParquetColumnarRowSplitReader.java      |  34 +-
 4 files changed, 472 insertions(+), 289 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
index 2bb5be1d9614..5468dc86a25a 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/ParquetSplitReaderUtil.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *     http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,19 +19,18 @@
 package org.apache.hudi.table.format.cow;
 
 import org.apache.hudi.common.util.ValidationUtils;
-import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
 import org.apache.hudi.table.format.cow.vector.HeapDecimalVector;
 import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
 import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-import org.apache.hudi.table.format.cow.vector.reader.ArrayColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.ArrayGroupReader;
 import org.apache.hudi.table.format.cow.vector.reader.EmptyColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.FixedLenBytesColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.Int64TimestampColumnReader;
-import org.apache.hudi.table.format.cow.vector.reader.MapColumnReader;
+import org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader;
 import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
-import org.apache.hudi.table.format.cow.vector.reader.RowColumnReader;
+import org.apache.hudi.table.format.cow.vector.type.ParquetField;
+import org.apache.hudi.table.format.cow.vector.type.ParquetGroupField;
+import org.apache.hudi.table.format.cow.vector.type.ParquetPrimitiveField;
 
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.formats.parquet.vector.reader.BooleanColumnReader;
@@ -64,12 +63,14 @@ import org.apache.flink.table.types.logical.DecimalType;
 import org.apache.flink.table.types.logical.IntType;
 import org.apache.flink.table.types.logical.LocalZonedTimestampType;
 import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeFamily;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
 import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.table.types.logical.TimestampType;
+import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.parquet.ParquetRuntimeException;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -77,12 +78,18 @@ import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.filter.UnboundRecordFilter;
 import org.apache.parquet.filter2.predicate.FilterPredicate;
+import org.apache.parquet.io.ColumnIO;
+import org.apache.parquet.io.GroupColumnIO;
+import org.apache.parquet.io.MessageColumnIO;
+import org.apache.parquet.io.PrimitiveColumnIO;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.InvalidSchemaException;
 import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.sql.Date;
@@ -90,25 +97,38 @@ import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.stream.Collectors;
 
 import static org.apache.flink.table.utils.DateTimeUtils.toInternal;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.parquet.Preconditions.checkArgument;
+import static org.apache.parquet.schema.Type.Repetition.REPEATED;
+import static org.apache.parquet.schema.Type.Repetition.REQUIRED;
 
 /**
  * Util for generating {@link ParquetColumnarRowSplitReader}.
  *
- * <p>NOTE: reference from Flink release 1.11.2 {@code 
ParquetSplitReaderUtil}, modify to support INT64
- * based TIMESTAMP_MILLIS as ConvertedType, should remove when Flink supports 
that.
+ * <p>Uses the Dremel-style nested reader ported from Apache Flink 2.1 
(FLINK-35702). For primitive
+ * top-level columns we keep Hudi's specialized readers — {@link 
Int64TimestampColumnReader},
+ * {@link FixedLenBytesColumnReader}, and the Hudi {@link HeapDecimalVector} — 
unchanged. For
+ * nested types (ARRAY / MAP / MULTISET / ROW) we build a {@link ParquetField} 
tree once per
+ * split via {@link #buildFieldsList(List, List, MessageColumnIO)} and 
delegate reading to
+ * {@link NestedColumnReader}.
+ *
+ * <p>Schema evolution: missing top-level fields are still handled by the 
caller
+ * ({@link ParquetColumnarRowSplitReader} patches them with null vectors). 
Missing fields
+ * <em>inside</em> a Row are handled here — {@link #constructField} returns 
{@code null} for a
+ * child that isn't physically present, and the corresponding child in the 
pre-allocated vector
+ * is filled with nulls via {@link #createVectorFromConstant} so the Dremel 
assembler can
+ * passthrough the slot (see {@link NestedColumnReader#readToVector}).
  */
 public class ParquetSplitReaderUtil {
 
-  /**
-   * Util for generating partitioned {@link ParquetColumnarRowSplitReader}.
-   */
+  /** Util for generating partitioned {@link ParquetColumnarRowSplitReader}. */
   public static ParquetColumnarRowSplitReader genPartColumnarRowReader(
       boolean utcTimestamp,
       boolean caseSensitive,
@@ -182,10 +202,13 @@ public class ParquetSplitReaderUtil {
     return readVector;
   }
 
-  private static ColumnVector createVectorFromConstant(
-      LogicalType type,
-      Object value,
-      int batchSize) {
+  /**
+   * Builds a constant-filled column vector for either a partition column 
(non-null value) or a
+   * missing-column slot (null value). Used both at the batch-generator level 
for partition
+   * injection and at the row-reader level for fields absent from the Parquet 
file.
+   */
+  public static ColumnVector createVectorFromConstant(
+      LogicalType type, Object value, int batchSize) {
     switch (type.getTypeRoot()) {
       case CHAR:
       case VARCHAR:
@@ -278,6 +301,7 @@ public class ParquetSplitReaderUtil {
             value == null ? null : toInternal((Date) value),
             batchSize);
       case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
         HeapTimestampVector tv = new HeapTimestampVector(batchSize);
         if (value == null) {
           tv.fillWithNulls();
@@ -286,46 +310,41 @@ public class ParquetSplitReaderUtil {
         }
         return tv;
       case ARRAY:
-        ArrayType arrayType = (ArrayType) type;
-        if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) 
{
-          HeapArrayGroupColumnVector arrayGroup = new 
HeapArrayGroupColumnVector(batchSize);
-          if (value == null) {
-            arrayGroup.fillWithNulls();
-            return arrayGroup;
-          } else {
-            throw new UnsupportedOperationException("Unsupported create array 
with default value.");
-          }
-        } else {
-          HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
-          if (value == null) {
-            arrayVector.fillWithNulls();
-            return arrayVector;
-          } else {
-            throw new UnsupportedOperationException("Unsupported create array 
with default value.");
-          }
+        if (value != null) {
+          throw new UnsupportedOperationException("Unsupported create array 
with default value.");
         }
+        HeapArrayVector arrayVector = new HeapArrayVector(batchSize);
+        arrayVector.fillWithNulls();
+        return arrayVector;
       case MAP:
-        HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize, 
null, null);
-        if (value == null) {
-          mapVector.fillWithNulls();
-          return mapVector;
-        } else {
-          throw new UnsupportedOperationException("Unsupported create map with 
default value.");
+      case MULTISET:
+        if (value != null) {
+          throw new UnsupportedOperationException(
+              "Unsupported create " + type.getTypeRoot() + " with default 
value.");
         }
+        HeapMapColumnVector mapVector = new HeapMapColumnVector(batchSize, 
null, null);
+        mapVector.fillWithNulls();
+        return mapVector;
       case ROW:
-        HeapRowColumnVector rowVector = new HeapRowColumnVector(batchSize);
-        if (value == null) {
-          rowVector.fillWithNulls();
-          return rowVector;
-        } else {
+        if (value != null) {
           throw new UnsupportedOperationException("Unsupported create row with 
default value.");
         }
+        RowType rowType = (RowType) type;
+        WritableColumnVector[] childVectors = new 
WritableColumnVector[rowType.getFieldCount()];
+        for (int i = 0; i < childVectors.length; i++) {
+          childVectors[i] =
+              (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
+        }
+        HeapRowColumnVector rowVector = new HeapRowColumnVector(batchSize, 
childVectors);
+        rowVector.fillWithNulls();
+        return rowVector;
       default:
         throw new UnsupportedOperationException("Unsupported type: " + type);
     }
   }
 
-  private static List<ColumnDescriptor> filterDescriptors(int depth, Type 
type, List<ColumnDescriptor> columns) throws ParquetRuntimeException {
+  private static List<ColumnDescriptor> filterDescriptors(
+      int depth, Type type, List<ColumnDescriptor> columns) throws 
ParquetRuntimeException {
     List<ColumnDescriptor> filtered = new ArrayList<>();
     for (ColumnDescriptor descriptor : columns) {
       if (depth >= descriptor.getPath().length) {
@@ -339,24 +358,61 @@ public class ParquetSplitReaderUtil {
     return filtered;
   }
 
+  /**
+   * Creates a {@link ColumnReader} for one top-level requested field. For 
primitive types the
+   * Hudi-specialized reader path is used. For nested types ({@code ARRAY}, 
{@code MAP},
+   * {@code MULTISET}, {@code ROW}) the Dremel-style {@link 
NestedColumnReader} is used, driven by
+   * the supplied pre-built {@link ParquetField} tree.
+   *
+   * @param field the {@link ParquetField} tree for this column, built by
+   *     {@link #buildFieldsList(List, List, MessageColumnIO)}. Required 
(non-null) for nested
+   *     types; ignored for primitives.
+   */
+  public static ColumnReader createColumnReader(
+      boolean utcTimestamp,
+      LogicalType fieldType,
+      Type physicalType,
+      List<ColumnDescriptor> descriptors,
+      PageReadStore pages,
+      @Nullable ParquetField field) throws IOException {
+    switch (fieldType.getTypeRoot()) {
+      case ARRAY:
+      case MAP:
+      case MULTISET:
+      case ROW:
+        Preconditions.checkNotNull(
+            field, "ParquetField must be provided for nested type: %s", 
fieldType);
+        return new NestedColumnReader(utcTimestamp, pages, field);
+      default:
+        return createPrimitiveColumnReader(utcTimestamp, fieldType, 
physicalType, descriptors, pages);
+    }
+  }
+
+  /**
+   * Backward-compat entry point kept for callers that don't project nested 
types and therefore
+   * never need a {@link ParquetField} tree. Forwards to the {@link 
ParquetField}-aware overload
+   * with a null field; nested types now go through that overload directly.
+   *
+   * @deprecated use {@link #createColumnReader(boolean, LogicalType, Type, 
List, PageReadStore,
+   *     ParquetField)} so nested types take the Dremel path.
+   */
+  @Deprecated
   public static ColumnReader createColumnReader(
       boolean utcTimestamp,
       LogicalType fieldType,
       Type physicalType,
       List<ColumnDescriptor> descriptors,
       PageReadStore pages) throws IOException {
-    return createColumnReader(utcTimestamp, fieldType, physicalType, 
descriptors,
-        pages, 0);
+    return createColumnReader(utcTimestamp, fieldType, physicalType, 
descriptors, pages, null);
   }
 
-  private static ColumnReader createColumnReader(
+  private static ColumnReader createPrimitiveColumnReader(
       boolean utcTimestamp,
       LogicalType fieldType,
       Type physicalType,
       List<ColumnDescriptor> columns,
-      PageReadStore pages,
-      int depth) throws IOException {
-    List<ColumnDescriptor> descriptors = filterDescriptors(depth, 
physicalType, columns);
+      PageReadStore pages) throws IOException {
+    List<ColumnDescriptor> descriptors = filterDescriptors(0, physicalType, 
columns);
     ColumnDescriptor descriptor = descriptors.get(0);
     PageReader pageReader = pages.getPageReader(descriptor);
     switch (fieldType.getTypeRoot()) {
@@ -392,7 +448,9 @@ public class ParquetSplitReaderUtil {
           case INT96:
             return new TimestampColumnReader(utcTimestamp, descriptor, 
pageReader);
           default:
-            throw new AssertionError();
+            throw new AssertionError(
+                "Unexpected physical type for TIMESTAMP: "
+                    + descriptor.getPrimitiveType().getPrimitiveTypeName());
         }
       case DECIMAL:
         switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
@@ -403,106 +461,23 @@ public class ParquetSplitReaderUtil {
           case BINARY:
             return new BytesColumnReader(descriptor, pageReader);
           case FIXED_LEN_BYTE_ARRAY:
-            return new FixedLenBytesColumnReader(
-                descriptor, pageReader);
+            return new FixedLenBytesColumnReader(descriptor, pageReader);
           default:
-            throw new AssertionError();
-        }
-      case ARRAY:
-        ArrayType arrayType = (ArrayType) fieldType;
-        if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) 
{
-          boolean isThreeLevelList = isThreeLevelList(physicalType);
-          // 3-level List structure, drill down 2 level to get type for 
`element`
-          Type elementType = isThreeLevelList
-              ? physicalType.asGroupType().getType(0).asGroupType().getType(0)
-              : physicalType.asGroupType().getType(0);
-          int elementDepth = isThreeLevelList ? depth + 2 : depth + 1;
-          return new ArrayGroupReader(createColumnReader(
-              utcTimestamp,
-              arrayType.getElementType(),
-              elementType,
-              descriptors,
-              pages,
-              elementDepth));
-        } else {
-          return new ArrayColumnReader(
-              descriptor,
-              pageReader,
-              utcTimestamp,
-              descriptor.getPrimitiveType(),
-              fieldType);
-        }
-      case MAP:
-        MapType mapType = (MapType) fieldType;
-        ArrayColumnReader keyReader =
-            new ArrayColumnReader(
-                descriptor,
-                pageReader,
-                utcTimestamp,
-                descriptor.getPrimitiveType(),
-                new ArrayType(mapType.getKeyType()));
-        ColumnReader<WritableColumnVector> valueReader;
-        if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
-          valueReader = new ArrayGroupReader(createColumnReader(
-              utcTimestamp,
-              mapType.getValueType(),
-              physicalType.asGroupType().getType(0).asGroupType().getType(1), 
// Get the value physical type
-              descriptors.subList(1, descriptors.size()), // remove the key 
descriptor
-              pages,
-              depth + 2)); // increase the depth by 2, because there's a 
key_value entry in the path
-        } else {
-          valueReader = new ArrayColumnReader(
-              descriptors.get(1),
-              pages.getPageReader(descriptors.get(1)),
-              utcTimestamp,
-              descriptors.get(1).getPrimitiveType(),
-              new ArrayType(mapType.getValueType()));
+            throw new AssertionError(
+                "Unexpected physical type for DECIMAL: "
+                    + descriptor.getPrimitiveType().getPrimitiveTypeName());
         }
-        return new MapColumnReader(keyReader, valueReader);
-      case ROW:
-        RowType rowType = (RowType) fieldType;
-        GroupType groupType = physicalType.asGroupType();
-        List<ColumnReader> fieldReaders = new ArrayList<>();
-        for (int i = 0; i < rowType.getFieldCount(); i++) {
-          // schema evolution: read the parquet file with a new extended field 
name.
-          int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
-          if (fieldIndex < 0) {
-            fieldReaders.add(new EmptyColumnReader());
-          } else {
-            // Check for nested row in array with atomic field type.
-
-            // This is done to meet the Parquet field algorithm that pushes 
multiplicity and structures down to individual fields.
-            // In Parquet, an array of rows is stored as separate arrays for 
each field.
-
-            // Limitations: It won't work for multiple nested arrays and maps.
-            // The main problem is that the Flink classes and interface don't 
follow that pattern.
-            if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && 
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
-              fieldReaders.add(
-                  createColumnReader(
-                      utcTimestamp,
-                      new ArrayType(rowType.getTypeAt(i).isNullable(), 
rowType.getTypeAt(i)),
-                      groupType.getType(fieldIndex),
-                      descriptors,
-                      pages,
-                      depth + 1));
-            } else {
-              fieldReaders.add(
-                  createColumnReader(
-                      utcTimestamp,
-                      rowType.getTypeAt(i),
-                      groupType.getType(fieldIndex),
-                      descriptors,
-                      pages,
-                      depth + 1));
-            }
-          }
-        }
-        return new RowColumnReader(fieldReaders);
       default:
         throw new UnsupportedOperationException(fieldType + " is not supported 
now.");
     }
   }
 
+  /**
+   * Creates the writable column vector that the reader will write into. The 
returned vector shape
+   * matches {@code fieldType}; for ROW types missing physical fields are 
slotted with null-filled
+   * vectors (sourced from {@link #createVectorFromConstant}) so that the 
Dremel assembler in
+   * {@link NestedColumnReader} can pass them through unchanged.
+   */
   public static WritableColumnVector createWritableColumnVector(
       int batchSize,
       LogicalType fieldType,
@@ -523,40 +498,48 @@ public class ParquetSplitReaderUtil {
     switch (fieldType.getTypeRoot()) {
       case BOOLEAN:
         checkArgument(
-            typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN, 
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            typeName == PrimitiveType.PrimitiveTypeName.BOOLEAN,
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapBooleanVector(batchSize);
       case TINYINT:
         checkArgument(
-            typeName == PrimitiveType.PrimitiveTypeName.INT32, 
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            typeName == PrimitiveType.PrimitiveTypeName.INT32,
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapByteVector(batchSize);
       case DOUBLE:
         checkArgument(
-            typeName == PrimitiveType.PrimitiveTypeName.DOUBLE, 
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            typeName == PrimitiveType.PrimitiveTypeName.DOUBLE,
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapDoubleVector(batchSize);
       case FLOAT:
         checkArgument(
-            typeName == PrimitiveType.PrimitiveTypeName.FLOAT, 
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            typeName == PrimitiveType.PrimitiveTypeName.FLOAT,
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapFloatVector(batchSize);
       case INTEGER:
       case DATE:
       case TIME_WITHOUT_TIME_ZONE:
         checkArgument(
-            typeName == PrimitiveType.PrimitiveTypeName.INT32, 
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            typeName == PrimitiveType.PrimitiveTypeName.INT32,
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapIntVector(batchSize);
       case BIGINT:
         checkArgument(
-            typeName == PrimitiveType.PrimitiveTypeName.INT64, 
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            typeName == PrimitiveType.PrimitiveTypeName.INT64,
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapLongVector(batchSize);
       case SMALLINT:
         checkArgument(
-            typeName == PrimitiveType.PrimitiveTypeName.INT32, 
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            typeName == PrimitiveType.PrimitiveTypeName.INT32,
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapShortVector(batchSize);
       case CHAR:
       case VARCHAR:
       case BINARY:
       case VARBINARY:
         checkArgument(
-            typeName == PrimitiveType.PrimitiveTypeName.BINARY, 
getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
+            typeName == PrimitiveType.PrimitiveTypeName.BINARY,
+            getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapBytesVector(batchSize);
       case TIMESTAMP_WITHOUT_TIME_ZONE:
       case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
@@ -566,112 +549,64 @@ public class ParquetSplitReaderUtil {
       case DECIMAL:
         checkArgument(
             (typeName == PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY
-                || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
+                    || typeName == PrimitiveType.PrimitiveTypeName.BINARY)
                 && primitiveType.getOriginalType() == OriginalType.DECIMAL,
             getPrimitiveTypeCheckFailureMessage(typeName, fieldType));
         return new HeapDecimalVector(batchSize);
       case ARRAY:
         ArrayType arrayType = (ArrayType) fieldType;
-        if (arrayType.getElementType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) 
{
-          boolean isThreeLevelList = isThreeLevelList(physicalType);
-          // 3-level List structure, drill down 2 level to get type for 
`element`
-          Type elementType = isThreeLevelList
-              ? physicalType.asGroupType().getType(0).asGroupType().getType(0)
-              : physicalType.asGroupType().getType(0);
-          int elementDepth = isThreeLevelList ? depth + 2 : depth + 1;
-          return new HeapArrayGroupColumnVector(
-              batchSize,
-              createWritableColumnVector(
-                  batchSize,
-                  arrayType.getElementType(),
-                  elementType,
-                  descriptors,
-                  elementDepth));
-        } else {
-          return new HeapArrayVector(
-              batchSize,
-              createWritableColumnVector(
-                  batchSize,
-                  arrayType.getElementType(),
-                  physicalType,
-                  descriptors,
-                  depth));
-        }
-      case MAP:
+        return new HeapArrayVector(
+            batchSize,
+            createWritableColumnVector(
+                batchSize, arrayType.getElementType(), physicalType, 
descriptors, depth));
+      case MAP: {
         MapType mapType = (MapType) fieldType;
-        GroupType repeatedType = 
physicalType.asGroupType().getType(0).asGroupType();
-        // the map column has three level paths.
-        WritableColumnVector keyColumnVector = createWritableColumnVector(
+        GroupType repeatedType = unwrapMapRepeatedType(physicalType);
+        return new HeapMapColumnVector(
             batchSize,
-            new ArrayType(mapType.getKeyType().isNullable(), 
mapType.getKeyType()),
-            repeatedType.getType(0),
-            descriptors,
-            depth + 2);
-        WritableColumnVector valueColumnVector;
-        if (mapType.getValueType().isAnyOf(LogicalTypeFamily.CONSTRUCTED)) {
-          valueColumnVector = new HeapArrayGroupColumnVector(
-              batchSize,
-              createWritableColumnVector(
-                  batchSize,
-                  mapType.getValueType(),
-                  repeatedType.getType(1).asGroupType(),
-                  descriptors,
-                  depth + 2));
-        } else {
-          valueColumnVector = createWritableColumnVector(
-              batchSize,
-              new ArrayType(mapType.getValueType().isNullable(), 
mapType.getValueType()),
-              repeatedType.getType(1),
-              descriptors,
-              depth + 2);
-        }
-        return new HeapMapColumnVector(batchSize, keyColumnVector, 
valueColumnVector);
+            createWritableColumnVector(
+                batchSize, mapType.getKeyType(), repeatedType.getType(0), 
descriptors, depth + 2),
+            createWritableColumnVector(
+                batchSize, mapType.getValueType(), repeatedType.getType(1), 
descriptors, depth + 2));
+      }
+      case MULTISET: {
+        MultisetType multisetType = (MultisetType) fieldType;
+        GroupType repeatedType = unwrapMapRepeatedType(physicalType);
+        return new HeapMapColumnVector(
+            batchSize,
+            createWritableColumnVector(
+                batchSize,
+                multisetType.getElementType(),
+                repeatedType.getType(0),
+                descriptors,
+                depth + 2),
+            createWritableColumnVector(
+                batchSize,
+                new IntType(false),
+                repeatedType.getType(1),
+                descriptors,
+                depth + 2));
+      }
       case ROW:
         RowType rowType = (RowType) fieldType;
         GroupType groupType = physicalType.asGroupType();
         WritableColumnVector[] columnVectors = new 
WritableColumnVector[rowType.getFieldCount()];
         for (int i = 0; i < columnVectors.length; i++) {
-          // schema evolution: read the file with a new extended field name.
           int fieldIndex = 
getFieldIndexInPhysicalType(rowType.getFields().get(i).getName(), groupType);
           if (fieldIndex < 0) {
-            // Check for nested row in array with atomic field type.
-
-            // This is done to meet the Parquet field algorithm that pushes 
multiplicity and structures down to individual fields.
-            // In Parquet, an array of rows is stored as separate arrays for 
each field.
-
-            // Limitations: It won't work for multiple nested arrays and maps.
-            // The main problem is that the Flink classes and interface don't 
follow that pattern.
-            if (groupType.getRepetition().equals(Type.Repetition.REPEATED) && 
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
-              columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(
-                  new ArrayType(rowType.getTypeAt(i).isNullable(), 
rowType.getTypeAt(i)), null, batchSize);
-            } else {
-              columnVectors[i] = (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
-            }
+            // Schema evolution: logical field is absent from the Parquet 
file. Slot a null-filled
+            // vector of the correct shape; NestedColumnReader.readRow will 
pass it through when the
+            // matching ParquetField child is null.
+            columnVectors[i] =
+                (WritableColumnVector) 
createVectorFromConstant(rowType.getTypeAt(i), null, batchSize);
           } else {
-            // Check for nested row in array with atomic field type.
-
-            // This is done to meet the Parquet field algorithm that pushes 
multiplicity and structures down to individual fields.
-            // In Parquet, an array of rows is stored as separate arrays for 
each field.
-
-            // Limitations: It won't work for multiple nested arrays and maps.
-            // The main problem is that the Flink classes and interface don't 
follow that pattern.
-            if (descriptors.get(fieldIndex).getMaxRepetitionLevel() > 0 && 
!rowType.getTypeAt(i).is(LogicalTypeRoot.ARRAY)) {
-              columnVectors[i] =
-                  createWritableColumnVector(
-                      batchSize,
-                      new ArrayType(rowType.getTypeAt(i).isNullable(), 
rowType.getTypeAt(i)),
-                      groupType.getType(fieldIndex),
-                      descriptors,
-                      depth + 1);
-            } else {
-              columnVectors[i] =
-                  createWritableColumnVector(
-                      batchSize,
-                      rowType.getTypeAt(i),
-                      groupType.getType(fieldIndex),
-                      descriptors,
-                      depth + 1);
-            }
+            columnVectors[i] =
+                createWritableColumnVector(
+                    batchSize,
+                    rowType.getTypeAt(i),
+                    groupType.getType(fieldIndex),
+                    descriptors,
+                    depth + 1);
           }
         }
         return new HeapRowColumnVector(batchSize, columnVectors);
@@ -681,56 +616,245 @@ public class ParquetSplitReaderUtil {
   }
 
   /**
-   * Returns the field index with given physical row type {@code groupType} 
and field name {@code fieldName}.
-   *
-   * @return The physical field index or -1 if the field does not exist
+   * Peels one {@code repeated group key_value} wrapper off a MAP / MULTISET 
physical type, matching
+   * Parquet's canonical 3-level map encoding.
    */
-  private static int getFieldIndexInPhysicalType(String fieldName, GroupType 
groupType) {
-    // get index from fileSchema type, else, return -1
-    return groupType.containsField(fieldName) ? 
groupType.getFieldIndex(fieldName) : -1;
+  private static GroupType unwrapMapRepeatedType(Type physicalType) {
+    return physicalType.asGroupType().getType(0).asGroupType();
   }
 
+  // 
------------------------------------------------------------------------------------------
+  // ParquetField tree construction (vendored from Apache Flink 2.1 
ParquetSplitReaderUtil)
+  //
+  // The only Hudi-specific divergence is in `constructField`: the ROW branch 
tolerates children
+  // missing from the Parquet file by emitting a null ParquetField child 
(upstream throws). This
+  // matches the Hudi schema-evolution contract and is the companion to the 
null-child branch in
+  // `NestedColumnReader#readRow` and the null-vector slot in 
`createWritableColumnVector#ROW`.
+  // 
------------------------------------------------------------------------------------------
+
   /**
-   * Check whether the given list type is a three-level list type.
-   * <p>
-   * <list-repetition> group <name> (LIST) {
-   *   repeated group list {
-   *     <element-repetition> <element-type> element;
-   *   }
-   * }
-   *
-   * @param type list type
-   * @return true if the list type is a three-level list type
+   * Builds {@link ParquetField} trees — one per top-level projected logical 
column — that feed
+   * {@link NestedColumnReader}. The returned list mirrors the input {@code 
children} positionally;
+   * primitive top-level fields produce {@code null} entries (callers don't 
need a tree for those).
+   */
+  public static List<ParquetField> buildFieldsList(
+      List<RowType.RowField> children, List<String> fieldNames, 
MessageColumnIO columnIO) {
+    List<ParquetField> list = new ArrayList<>();
+    for (int i = 0; i < children.size(); i++) {
+      RowType.RowField child = children.get(i);
+      if (isNestedType(child.getType())) {
+        list.add(constructField(child, lookupColumnByName(columnIO, 
fieldNames.get(i))));
+      } else {
+        list.add(null);
+      }
+    }
+    return list;
+  }
+
+  private static boolean isNestedType(LogicalType type) {
+    return type instanceof RowType
+        || type instanceof ArrayType
+        || type instanceof MapType
+        || type instanceof MultisetType;
+  }
+
+  @Nullable
+  private static ParquetField constructField(RowType.RowField rowField, 
ColumnIO columnIO) {
+    boolean required = columnIO.getType().getRepetition() == REQUIRED;
+    int repetitionLevel = columnIO.getRepetitionLevel();
+    int definitionLevel = columnIO.getDefinitionLevel();
+    LogicalType type = rowField.getType();
+    String fieldName = rowField.getName();
+    if (type instanceof RowType) {
+      GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+      RowType rowType = (RowType) type;
+      List<RowType.RowField> childFields = rowType.getFields();
+      List<ParquetField> fieldsList = new ArrayList<>(childFields.size());
+      for (RowType.RowField childField : childFields) {
+        // Hudi schema evolution: a logical child may be absent from the 
Parquet file. In that
+        // case we emit a null ParquetField so that NestedColumnReader.readRow 
passes through the
+        // pre-filled null vector instead of recursing.
+        ColumnIO childIo = lookupColumnByNameOrNull(groupColumnIO, 
childField.getName());
+        if (childIo == null) {
+          fieldsList.add(null);
+        } else {
+          fieldsList.add(constructField(childField, childIo));
+        }
+      }
+      return new ParquetGroupField(
+          type,
+          repetitionLevel,
+          definitionLevel,
+          required,
+          Collections.unmodifiableList(fieldsList));
+    }
+
+    if (type instanceof MapType) {
+      GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+      GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
+      MapType mapType = (MapType) type;
+      ParquetField keyField =
+          constructField(
+              new RowType.RowField("", mapType.getKeyType()), 
keyValueColumnIO.getChild(0));
+      ParquetField valueField =
+          constructField(
+              new RowType.RowField("", mapType.getValueType()), 
keyValueColumnIO.getChild(1));
+      return new ParquetGroupField(
+          type,
+          repetitionLevel,
+          definitionLevel,
+          required,
+          Collections.unmodifiableList(Arrays.asList(keyField, valueField)));
+    }
+
+    if (type instanceof MultisetType) {
+      GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+      GroupColumnIO keyValueColumnIO = getMapKeyValueColumn(groupColumnIO);
+      MultisetType multisetType = (MultisetType) type;
+      ParquetField keyField =
+          constructField(
+              new RowType.RowField("", multisetType.getElementType()),
+              keyValueColumnIO.getChild(0));
+      ParquetField valueField =
+          constructField(
+              new RowType.RowField("", new IntType()), 
keyValueColumnIO.getChild(1));
+      return new ParquetGroupField(
+          type,
+          repetitionLevel,
+          definitionLevel,
+          required,
+          Collections.unmodifiableList(Arrays.asList(keyField, valueField)));
+    }
+
+    if (type instanceof ArrayType) {
+      ArrayType arrayType = (ArrayType) type;
+      ColumnIO elementTypeColumnIO;
+      if (columnIO instanceof GroupColumnIO) {
+        GroupColumnIO groupColumnIO = (GroupColumnIO) columnIO;
+        if (!StringUtils.isNullOrWhitespaceOnly(fieldName)) {
+          while (!Objects.equals(groupColumnIO.getName(), fieldName)) {
+            groupColumnIO = (GroupColumnIO) groupColumnIO.getChild(0);
+          }
+          elementTypeColumnIO = groupColumnIO;
+        } else {
+          if (arrayType.getElementType() instanceof RowType) {
+            elementTypeColumnIO = groupColumnIO;
+          } else {
+            elementTypeColumnIO = groupColumnIO.getChild(0);
+          }
+        }
+      } else if (columnIO instanceof PrimitiveColumnIO) {
+        elementTypeColumnIO = columnIO;
+      } else {
+        throw new FlinkRuntimeException(String.format("Unknown ColumnIO, %s", 
columnIO));
+      }
+
+      ParquetField elementField =
+          constructField(
+              new RowType.RowField("", arrayType.getElementType()),
+              getArrayElementColumn(elementTypeColumnIO));
+      if (repetitionLevel == elementField.getRepetitionLevel()) {
+        repetitionLevel = columnIO.getParent().getRepetitionLevel();
+      }
+      return new ParquetGroupField(
+          type,
+          repetitionLevel,
+          definitionLevel,
+          required,
+          Collections.singletonList(elementField));
+    }
+
+    PrimitiveColumnIO primitiveColumnIO = (PrimitiveColumnIO) columnIO;
+    return new ParquetPrimitiveField(
+        type, required, primitiveColumnIO.getColumnDescriptor(), 
primitiveColumnIO.getId());
+  }
+
+  /**
+   * Parquet column names are case-insensitive in Flink's lookup. Matches 
upstream
+   * {@code ParquetSplitReaderUtil.lookupColumnByName}; throws when absent.
    */
-  private static boolean isThreeLevelList(Type type) {
-    if (type.isPrimitive()) {
-      return false;
+  public static ColumnIO lookupColumnByName(GroupColumnIO groupColumnIO, 
String columnName) {
+    ColumnIO columnIO = lookupColumnByNameOrNull(groupColumnIO, columnName);
+    if (columnIO != null) {
+      return columnIO;
     }
-    GroupType groupType = type.asGroupType();
-    OriginalType originalType = groupType.getOriginalType();
-    return originalType == OriginalType.LIST
-        && groupType.getType(0).getName().equals("list");
+    throw new FlinkRuntimeException(
+        "Can not find column io for parquet reader. Column name: " + 
columnName);
   }
 
   /**
-   * Construct the error message when primitive type mismatches.
-   *
-   * @param primitiveType Primitive type
-   * @param fieldType     Logical field type
-   * @return The error message
+   * Case-insensitive column lookup that returns {@code null} when no match is 
found — the
+   * Hudi-specific companion to {@link #lookupColumnByName}, used by {@link 
#constructField} to
+   * emit null {@link ParquetField} children for fields absent from the 
Parquet file.
    */
-  private static String 
getPrimitiveTypeCheckFailureMessage(PrimitiveType.PrimitiveTypeName 
primitiveType, LogicalType fieldType) {
-    return String.format("Unexpected type exception. Primitive type: %s. Field 
type: %s.", primitiveType, fieldType.getTypeRoot().name());
+  @Nullable
+  private static ColumnIO lookupColumnByNameOrNull(
+      GroupColumnIO groupColumnIO, String columnName) {
+    ColumnIO columnIO = groupColumnIO.getChild(columnName);
+    if (columnIO != null) {
+      return columnIO;
+    }
+    for (int i = 0; i < groupColumnIO.getChildrenCount(); i++) {
+      if (groupColumnIO.getChild(i).getName().equalsIgnoreCase(columnName)) {
+        return groupColumnIO.getChild(i);
+      }
+    }
+    return null;
+  }
+
+  public static GroupColumnIO getMapKeyValueColumn(GroupColumnIO 
groupColumnIO) {
+    while (groupColumnIO.getChildrenCount() == 1) {
+      groupColumnIO = (GroupColumnIO) groupColumnIO.getChild(0);
+    }
+    return groupColumnIO;
+  }
+
+  public static ColumnIO getArrayElementColumn(ColumnIO columnIO) {
+    while (columnIO instanceof GroupColumnIO && 
!columnIO.getType().isRepetition(REPEATED)) {
+      columnIO = ((GroupColumnIO) columnIO).getChild(0);
+    }
+
+    // Three-level list: skip the synthetic `element` / `list` wrapper when 
present.
+    if (columnIO instanceof GroupColumnIO
+        && columnIO.getType().getLogicalTypeAnnotation() == null
+        && ((GroupColumnIO) columnIO).getChildrenCount() == 1
+        && !columnIO.getName().equals("array")
+        && !columnIO.getName().equals(columnIO.getParent().getName() + 
"_tuple")) {
+      return ((GroupColumnIO) columnIO).getChild(0);
+    }
+    return columnIO;
   }
 
   /**
-   * Construct the error message when original type mismatches.
+   * Returns the field index with given physical row type {@code groupType} 
and field name
+   * {@code fieldName}.
    *
-   * @param originalType Original type
-   * @param fieldType    Logical field type
-   * @return The error message
+   * @return the physical field index or -1 if the field does not exist
+   */
+  private static int getFieldIndexInPhysicalType(String fieldName, GroupType 
groupType) {
+    return groupType.containsField(fieldName) ? 
groupType.getFieldIndex(fieldName) : -1;
+  }
+
+  private static String getPrimitiveTypeCheckFailureMessage(
+      PrimitiveType.PrimitiveTypeName primitiveType, LogicalType fieldType) {
+    return String.format(
+        "Unexpected type exception. Primitive type: %s. Field type: %s.",
+        primitiveType, fieldType.getTypeRoot().name());
+  }
+
+  private static String getOriginalTypeCheckFailureMessage(
+      OriginalType originalType, LogicalType fieldType) {
+    return String.format(
+        "Unexpected type exception. Original type: %s. Field type: %s.",
+        originalType, fieldType.getTypeRoot().name());
+  }
+
+  /**
+   * Returns a synthetic null-column reader to fill missing top-level fields. 
Kept as a convenience
+   * for callers that need to mirror Hudi's original behaviour where a missing 
column produces an
+   * explicit null-valued reader rather than being omitted from the batch.
    */
-  private static String getOriginalTypeCheckFailureMessage(OriginalType 
originalType, LogicalType fieldType) {
-    return String.format("Unexpected type exception. Original type: %s. Field 
type: %s.", originalType, fieldType.getTypeRoot().name());
+  public static ColumnReader<WritableColumnVector> emptyColumnReader() {
+    return new EmptyColumnReader();
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
index a98bdebd707a..14aad22039e0 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format.cow.vector;
 
 import lombok.Getter;
 import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.columnar.ColumnarMapData;
 import org.apache.flink.table.data.columnar.vector.ColumnVector;
 import org.apache.flink.table.data.columnar.vector.MapColumnVector;
 import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
@@ -27,6 +28,14 @@ import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector
 
 /**
  * This class represents a nullable heap map column vector.
+ *
+ * <p>Mirrors {@code 
org.apache.flink.table.data.columnar.vector.heap.HeapMapVector} from
+ * Flink 2.1 (FLINK-35702). One deliberate divergence from upstream is 
preserved for backward
+ * compatibility: the {@code keys} / {@code values} fields are typed
+ * {@link WritableColumnVector} rather than upstream's {@link ColumnVector}, 
so the existing
+ * Lombok-generated {@code getKeys()} / {@code getValues()} accessors keep 
their original
+ * signature. Callers wanting the Flink-2.1 contract (a {@code ColumnVector}) 
use
+ * {@link #getKeyColumnVector()} / {@link #getValueColumnVector()}.
  */
 public class HeapMapColumnVector extends AbstractHeapVector
     implements WritableColumnVector, MapColumnVector {
@@ -38,10 +47,8 @@ public class HeapMapColumnVector extends AbstractHeapVector
 
   // 
---------------------------------------------------------------------------------------------
   // Flink 2.1 Dremel-style state. Populated by {@link
-  // org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader} 
(FLINK-35702 port). The
-  // legacy {@link #getMap(int)} implementation below continues to use {@code 
ColumnarGroupMapData}
-  // — wiring it through these offsets/lengths happens in a follow-up PR that 
switches the read
-  // path. Left here so the new readers can compile against the additive 
surface.
+  // org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader} 
(FLINK-35702 port) and
+  // consumed by {@link #getMap(int)}.
   // 
---------------------------------------------------------------------------------------------
   private long[] offsets;
   private long[] lengths;
@@ -102,6 +109,8 @@ public class HeapMapColumnVector extends AbstractHeapVector
 
   @Override
   public MapData getMap(int rowId) {
-    return new ColumnarGroupMapData(keys, values, rowId);
+    long offset = offsets[rowId];
+    long length = lengths[rowId];
+    return new ColumnarMapData(keys, values, (int) offset, (int) length);
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
index 27eab298b320..6ae1cc9492ec 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
@@ -147,6 +147,28 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
     if (rowPosition.getIsNull() != null) {
       setFieldNullFlag(rowPosition.getIsNull(), heapRowVector);
     }
+
+    // Hudi-specific: collapse a present row whose every child is null into a 
null row. The
+    // legacy RowColumnReader did this so that a SQL value like `row(null, 
null)` round-trips
+    // to NULL on read; preserve it here for backward compatibility. Diverges 
from Flink 2.1,
+    // which would surface it as Row(null, null). Mirrored by the integration 
test
+    // ITTestHoodieDataSource#testParquetNullChildColumnsRowTypes.
+    int rowCount = rowPosition.getPositionsCount();
+    for (int j = 0; j < rowCount; j++) {
+      if (heapRowVector.isNullAt(j)) {
+        continue;
+      }
+      boolean allChildrenNull = true;
+      for (WritableColumnVector child : finalChildrenVectors) {
+        if (!child.isNullAt(j)) {
+          allChildrenNull = false;
+          break;
+        }
+      }
+      if (allChildrenNull) {
+        heapRowVector.setNullAt(j);
+      }
+    }
     return Tuple2.of(levelDelegation, heapRowVector);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
index 3572b117a631..1826419db5d4 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetColumnarRowSplitReader.java
@@ -18,7 +18,9 @@
 
 package org.apache.hudi.table.format.cow.vector.reader;
 
+import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
 import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
+import org.apache.hudi.table.format.cow.vector.type.ParquetField;
 
 import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
 import org.apache.flink.table.data.RowData;
@@ -28,6 +30,7 @@ import 
org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
 import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeRoot;
+import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -39,6 +42,8 @@ import org.apache.parquet.filter2.predicate.FilterPredicate;
 import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.metadata.BlockMetaData;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.io.ColumnIOFactory;
+import org.apache.parquet.io.MessageColumnIO;
 import org.apache.parquet.schema.GroupType;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.Type;
@@ -46,6 +51,7 @@ import org.apache.parquet.schema.Types;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -53,7 +59,6 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.stream.IntStream;
 
-import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createColumnReader;
 import static 
org.apache.hudi.table.format.cow.ParquetSplitReaderUtil.createWritableColumnVector;
 import static org.apache.parquet.filter2.compat.FilterCompat.get;
 import static org.apache.parquet.filter2.compat.RowGroupFilter.filterRowGroups;
@@ -77,6 +82,14 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
 
   private final MessageType requestedSchema;
 
+  /**
+   * {@link ParquetField} tree per top-level requested column, used by
+   * {@link ParquetSplitReaderUtil#createColumnReader(boolean, LogicalType, 
Type, List,
+   * PageReadStore, ParquetField)} to drive the Dremel-style {@link 
NestedColumnReader} for
+   * nested types. Entries are {@code null} for primitive top-level fields. 
Built once per split.
+   */
+  private final List<ParquetField> requestedFields;
+
   /**
    * The total number of rows this RecordReader will eventually read. The sum 
of the rows of all
    * the row groups.
@@ -158,6 +171,20 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
 
     checkSchema();
 
+    // Build the ParquetField tree once per split (the Dremel-style nested 
reader reuses it across
+    // row groups). Only columns with nested logical type get a non-null entry 
— primitive columns
+    // still use Hudi's specialized ColumnReaders.
+    MessageColumnIO messageColumnIO = new 
ColumnIOFactory().getColumnIO(requestedSchema);
+    List<RowType.RowField> requestedRowFields = new 
ArrayList<>(requestedTypes.length);
+    List<String> requestedFieldNames = new ArrayList<>(requestedTypes.length);
+    for (int i = 0; i < requestedTypes.length; i++) {
+      String name = requestedSchema.getFieldName(i);
+      requestedRowFields.add(new RowType.RowField(name, requestedTypes[i]));
+      requestedFieldNames.add(name);
+    }
+    this.requestedFields = ParquetSplitReaderUtil.buildFieldsList(
+        requestedRowFields, requestedFieldNames, messageColumnIO);
+
     this.writableVectors = createWritableVectors();
     ColumnVector[] columnVectors = patchedVector(selectedFieldNames.length, 
createReadableVectors(), requestedIndices);
     this.columnarBatch = generator.generate(columnVectors);
@@ -340,12 +367,13 @@ public class ParquetColumnarRowSplitReader implements 
Closeable {
     List<ColumnDescriptor> columns = requestedSchema.getColumns();
     columnReaders = new ColumnReader[types.size()];
     for (int i = 0; i < types.size(); ++i) {
-      columnReaders[i] = createColumnReader(
+      columnReaders[i] = ParquetSplitReaderUtil.createColumnReader(
           utcTimestamp,
           requestedTypes[i],
           types.get(i),
           columns,
-          pages);
+          pages,
+          requestedFields.get(i));
     }
     totalCountLoadedSoFar += pages.getRowCount();
   }

Reply via email to