This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b7da18e322d9 refactor(flink): Unify file reader creation in 
FlinkRowDataReaderContext (#19050)
b7da18e322d9 is described below

commit b7da18e322d9de3810eb3c32222aa7bb6971efa3
Author: Shuo Cheng <[email protected]>
AuthorDate: Wed Jun 24 09:48:47 2026 +0800

    refactor(flink): Unify file reader creation in FlinkRowDataReaderContext 
(#19050)
---
 .../table/format/FlinkRowDataReaderContext.java    | 33 +++++---------
 .../hudi/table/format/HoodieRowDataFileReader.java | 50 ++++++++++++++++++++++
 .../table/format/HoodieRowDataLanceReader.java     | 19 +++++++-
 .../table/format/HoodieRowDataParquetReader.java   | 17 +++++++-
 4 files changed, 94 insertions(+), 25 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 617dda2079ba..5cc3f85c82e0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -52,7 +52,6 @@ import org.apache.hudi.util.RowDataQueryContexts;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
-import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
 
 import java.io.IOException;
@@ -103,28 +102,18 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
     // disable schema evolution in fileReader if it's log file, since schema 
evolution for log file is handled in `FileGroupRecordBuffer`
     InternalSchemaManager schemaManager = isLogFile ? 
InternalSchemaManager.DISABLED : internalSchemaManager.get();
 
-    if 
(filePath.getName().endsWith(HoodieFileFormat.LANCE.getFileExtension())) {
-      if (schemaManager != InternalSchemaManager.DISABLED
-          && 
!schemaManager.getMergeSchema(filePath.getName()).isEmptySchema()) {
-        throw new HoodieValidationException("Flink Lance base-file support 
does not support schema evolution.");
-      }
-      HoodieRowDataLanceReader rowDataLanceReader =
-          (HoodieRowDataLanceReader) HoodieIOFactory.getIOFactory(storage)
-              .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
-              .getFileReader(tableConfig, filePath, HoodieFileFormat.LANCE, 
Option.empty());
-      try {
-        return 
rowDataLanceReader.getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(),
 requiredSchema);
-      } catch (RuntimeException e) {
-        rowDataLanceReader.close();
-        throw new HoodieException("Failed to get iterator from lance reader", 
e);
-      }
+    // Log files only reach this method for parquet data blocks; base files 
are resolved by their extension.
+    // Format-specific handling lives in the readers themselves, so this 
method stays format-agnostic.
+    HoodieFileFormat format = isLogFile ? HoodieFileFormat.PARQUET : 
HoodieFileFormat.fromFileExtension(filePath.getFileExtension());
+    HoodieRowDataFileReader reader = (HoodieRowDataFileReader) 
HoodieIOFactory.getIOFactory(storage)
+        .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
+        .getFileReader(tableConfig, filePath, format, Option.empty());
+    try {
+      return reader.getRowDataIterator(dataSchema, requiredSchema, 
schemaManager, getSafePredicates(requiredSchema));
+    } catch (Throwable e) {
+      reader.close();
+      throw new HoodieException("Failed to get record iterator for: " + 
filePath, e);
     }
-    DataType rowType = 
RowDataQueryContexts.fromSchema(dataSchema).getRowType();
-    HoodieRowDataParquetReader rowDataParquetReader =
-        (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
-            .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
-            .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, 
Option.empty());
-    return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema, getSafePredicates(requiredSchema));
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReader.java
new file mode 100644
index 000000000000..9339d3d041cc
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataFileReader.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format;
+
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.source.ExpressionPredicates;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * A {@link HoodieFileReader} that reads {@link RowData}s and exposes a 
uniform, format-agnostic
+ * entry point for getting a record iterator.
+ */
+public interface HoodieRowDataFileReader extends HoodieFileReader<RowData> {
+
+  /**
+   * Returns an iterator over {@link RowData}s for the given schemas.
+   *
+   * @param dataSchema            schema of the records stored in the file.
+   * @param requiredSchema        schema containing the fields to project.
+   * @param internalSchemaManager schema evolution manager; {@link 
InternalSchemaManager#DISABLED} to skip.
+   * @param predicates            filters to push down to the reader.
+   */
+  ClosableIterator<RowData> getRowDataIterator(
+      HoodieSchema dataSchema,
+      HoodieSchema requiredSchema,
+      InternalSchemaManager internalSchemaManager,
+      List<ExpressionPredicates.Predicate> predicates) throws IOException;
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
index eba397093ec5..5a4597d27710 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataLanceReader.java
@@ -32,9 +32,10 @@ import 
org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.io.memory.HoodieArrowAllocator;
-import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.row.HoodieFlinkLanceArrowUtils;
+import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.RowDataQueryContexts;
@@ -65,7 +66,7 @@ import static 
org.apache.hudi.avro.HoodieBloomFilterWriteSupport.HOODIE_MIN_RECO
 /**
  * Lance reader for Flink RowData base files.
  */
-public class HoodieRowDataLanceReader implements HoodieFileReader<RowData> {
+public class HoodieRowDataLanceReader implements HoodieRowDataFileReader {
 
   private static final int DEFAULT_BATCH_SIZE = 512;
 
@@ -152,6 +153,20 @@ public class HoodieRowDataLanceReader implements 
HoodieFileReader<RowData> {
     return new CloseableMappingIterator<>(rowDataItr, rowData -> 
rowData.getString(0).toString());
   }
 
+  @Override
+  public ClosableIterator<RowData> getRowDataIterator(
+      HoodieSchema dataSchema,
+      HoodieSchema requiredSchema,
+      InternalSchemaManager internalSchemaManager,
+      List<ExpressionPredicates.Predicate> predicates) {
+    // Lance base-file reading does not support schema evolution.
+    if (internalSchemaManager != InternalSchemaManager.DISABLED
+        && 
!internalSchemaManager.getMergeSchema(path.getName()).isEmptySchema()) {
+      throw new HoodieValidationException("Flink Lance base-file support does 
not support schema evolution.");
+    }
+    return 
getRowDataIterator(RowDataQueryContexts.fromSchema(requiredSchema).getRowType(),
 requiredSchema);
+  }
+
   public ClosableIterator<RowData> getRowDataIterator(DataType dataType, 
HoodieSchema requestedSchema) {
     RowType rowType = (RowType) dataType.getLogicalType();
     List<String> columnNames = new ArrayList<>(rowType.getFieldCount());
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
index e0f5faf7ed26..09ebc52a036c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/HoodieRowDataParquetReader.java
@@ -32,10 +32,12 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.io.storage.HoodieIOFactory;
 import org.apache.hudi.io.storage.row.parquet.ParquetSchemaConverter;
+import org.apache.hudi.source.ExpressionPredicates;
 import org.apache.hudi.source.ExpressionPredicates.Predicate;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.HoodieSchemaConverter;
+import org.apache.hudi.util.RowDataQueryContexts;
 import org.apache.hudi.util.VectorConversionUtils;
 
 import org.apache.flink.table.api.DataTypes;
@@ -55,7 +57,7 @@ import java.util.Set;
 /**
  * Implementation of {@link HoodieFileReader} to read {@link RowData}s from 
base file.
  */
-public class HoodieRowDataParquetReader implements HoodieFileReader<RowData>  {
+public class HoodieRowDataParquetReader implements HoodieRowDataFileReader  {
   private final HoodieStorage storage;
   private final ParquetUtils parquetUtils;
   private final StoragePath path;
@@ -98,6 +100,19 @@ public class HoodieRowDataParquetReader implements 
HoodieFileReader<RowData>  {
     return new CloseableMappingIterator<>(rowDataItr, rowData -> 
Objects.toString(rowData.getString(0)));
   }
 
+  @Override
+  public ClosableIterator<RowData> getRowDataIterator(
+      HoodieSchema dataSchema,
+      HoodieSchema requiredSchema,
+      InternalSchemaManager internalSchemaManager,
+      List<ExpressionPredicates.Predicate> predicates) throws IOException {
+    return getRowDataIterator(
+        internalSchemaManager,
+        RowDataQueryContexts.fromSchema(dataSchema).getRowType(),
+        requiredSchema,
+        predicates);
+  }
+
   public ClosableIterator<RowData> getRowDataIterator(
       InternalSchemaManager internalSchemaManager,
       DataType dataType,

Reply via email to