This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch release-0.12.2-shadow
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f9f9541e2e405bae9db2743c13f6a1b1f187c066
Author: sivabalan <n.siv...@gmail.com>
AuthorDate: Tue Dec 13 16:45:56 2022 -0800

    Revert "[HUDI-5306] Unify RecordIterator and HoodieParquetReader with 
ClosableIterator (#7340)"
    
    This reverts commit 1b8e5ec12e300ed76c74fdddb585ede55c66b28a.
---
 .../apache/hudi/configuration/OptionsResolver.java |  18 --
 .../java/org/apache/hudi/table/format/CastMap.java | 223 ----------------
 .../org/apache/hudi/table/format/FormatUtils.java  |  28 --
 .../hudi/table/format/InternalSchemaManager.java   | 170 ------------
 .../table/format/ParquetSplitRecordIterator.java   |  61 -----
 .../apache/hudi/table/format/RecordIterators.java  |  91 -------
 .../table/format/SchemaEvolvedRecordIterator.java  |  52 ----
 .../table/format/cow/CopyOnWriteInputFormat.java   |  27 +-
 .../table/format/mor/MergeOnReadInputFormat.java   | 290 ++++++++++-----------
 .../apache/hudi/util/RowDataCastProjection.java    |  49 ----
 .../org/apache/hudi/util/RowDataProjection.java    |   9 +-
 11 files changed, 151 insertions(+), 867 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 029ac3c4027..0dd31ee7538 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -202,24 +202,6 @@ public class OptionsResolver {
     return !conf.contains(FlinkOptions.READ_START_COMMIT) && 
!conf.contains(FlinkOptions.READ_END_COMMIT);
   }
 
-<<<<<<< HEAD
-=======
-  /**
-   * Returns the supplemental logging mode.
-   */
-  public static HoodieCDCSupplementalLoggingMode 
getCDCSupplementalLoggingMode(Configuration conf) {
-    String mode = conf.getString(FlinkOptions.SUPPLEMENTAL_LOGGING_MODE);
-    return HoodieCDCSupplementalLoggingMode.parse(mode);
-  }
-
-  /**
-   * Returns whether comprehensive schema evolution enabled.
-   */
-  public static boolean isSchemaEvolutionEnabled(Configuration conf) {
-    return conf.getBoolean(HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.key(), 
HoodieCommonConfig.SCHEMA_EVOLUTION_ENABLE.defaultValue());
-  }
-
->>>>>>> 07cc3e89a7 ([HUDI-5306] Unify RecordIterator and HoodieParquetReader 
with ClosableIterator (#7340))
   // -------------------------------------------------------------------------
   //  Utilities
   // -------------------------------------------------------------------------
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
deleted file mode 100644
index 36cf8708875..00000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/CastMap.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format;
-
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.util.RowDataCastProjection;
-import org.apache.hudi.util.RowDataProjection;
-
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.binary.BinaryStringData;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeRoot;
-
-import javax.annotation.Nullable;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.time.LocalDate;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-
-import static org.apache.flink.table.types.logical.LogicalTypeRoot.BIGINT;
-import static org.apache.flink.table.types.logical.LogicalTypeRoot.DATE;
-import static org.apache.flink.table.types.logical.LogicalTypeRoot.DECIMAL;
-import static org.apache.flink.table.types.logical.LogicalTypeRoot.DOUBLE;
-import static org.apache.flink.table.types.logical.LogicalTypeRoot.FLOAT;
-import static org.apache.flink.table.types.logical.LogicalTypeRoot.INTEGER;
-import static org.apache.flink.table.types.logical.LogicalTypeRoot.VARCHAR;
-
-/**
- * CastMap is responsible for conversion of flink types when full schema 
evolution enabled.
- *
- * <p>Supported cast conversions:
- * <ul>
- *   <li>Integer => Long, Float, Double, Decimal, String</li>
- *   <li>Long => Float, Double, Decimal, String</li>
- *   <li>Float => Double, Decimal, String</li>
- *   <li>Double => Decimal, String</li>
- *   <li>Decimal => Decimal, String</li>
- *   <li>String => Decimal, Date</li>
- *   <li>Date => String</li>
- * </ul>
- */
-public final class CastMap implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-
-  // Maps position to corresponding cast
-  private final Map<Integer, Cast> castMap = new HashMap<>();
-
-  private DataType[] fileFieldTypes;
-
-  public Option<RowDataProjection> toRowDataProjection(int[] selectedFields) {
-    if (castMap.isEmpty()) {
-      return Option.empty();
-    }
-    LogicalType[] requiredType = new LogicalType[selectedFields.length];
-    for (int i = 0; i < selectedFields.length; i++) {
-      requiredType[i] = fileFieldTypes[selectedFields[i]].getLogicalType();
-    }
-    return Option.of(new RowDataCastProjection(requiredType, this));
-  }
-
-  public Object castIfNeeded(int pos, Object val) {
-    Cast cast = castMap.get(pos);
-    if (cast == null) {
-      return val;
-    }
-    return cast.convert(val);
-  }
-
-  public DataType[] getFileFieldTypes() {
-    return fileFieldTypes;
-  }
-
-  public void setFileFieldTypes(DataType[] fileFieldTypes) {
-    this.fileFieldTypes = fileFieldTypes;
-  }
-
-  @VisibleForTesting
-  void add(int pos, LogicalType fromType, LogicalType toType) {
-    Function<Object, Object> conversion = getConversion(fromType, toType);
-    if (conversion == null) {
-      throw new IllegalArgumentException(String.format("Cannot create cast %s 
=> %s at pos %s", fromType, toType, pos));
-    }
-    add(pos, new Cast(fromType, toType, conversion));
-  }
-
-  private @Nullable Function<Object, Object> getConversion(LogicalType 
fromType, LogicalType toType) {
-    LogicalTypeRoot from = fromType.getTypeRoot();
-    LogicalTypeRoot to = toType.getTypeRoot();
-    switch (to) {
-      case BIGINT: {
-        if (from == INTEGER) {
-          return val -> ((Number) val).longValue();
-        }
-        break;
-      }
-      case FLOAT: {
-        if (from == INTEGER || from == BIGINT) {
-          return val -> ((Number) val).floatValue();
-        }
-        break;
-      }
-      case DOUBLE: {
-        if (from == INTEGER || from == BIGINT) {
-          return val -> ((Number) val).doubleValue();
-        }
-        if (from == FLOAT) {
-          return val -> Double.parseDouble(val.toString());
-        }
-        break;
-      }
-      case DECIMAL: {
-        if (from == INTEGER || from == BIGINT || from == DOUBLE) {
-          return val -> toDecimalData((Number) val, toType);
-        }
-        if (from == FLOAT) {
-          return val -> toDecimalData(Double.parseDouble(val.toString()), 
toType);
-        }
-        if (from == VARCHAR) {
-          return val -> toDecimalData(Double.parseDouble(val.toString()), 
toType);
-        }
-        if (from == DECIMAL) {
-          return val -> toDecimalData(((DecimalData) val).toBigDecimal(), 
toType);
-        }
-        break;
-      }
-      case VARCHAR: {
-        if (from == INTEGER
-            || from == BIGINT
-            || from == FLOAT
-            || from == DOUBLE
-            || from == DECIMAL) {
-          return val -> new BinaryStringData(String.valueOf(val));
-        }
-        if (from == DATE) {
-          return val -> new BinaryStringData(LocalDate.ofEpochDay(((Integer) 
val).longValue()).toString());
-        }
-        break;
-      }
-      case DATE: {
-        if (from == VARCHAR) {
-          return val -> (int) LocalDate.parse(val.toString()).toEpochDay();
-        }
-        break;
-      }
-      default:
-    }
-    return null;
-  }
-
-  private void add(int pos, Cast cast) {
-    castMap.put(pos, cast);
-  }
-
-  private DecimalData toDecimalData(Number val, LogicalType decimalType) {
-    BigDecimal valAsDecimal = BigDecimal.valueOf(val.doubleValue());
-    return toDecimalData(valAsDecimal, decimalType);
-  }
-
-  private DecimalData toDecimalData(BigDecimal valAsDecimal, LogicalType 
decimalType) {
-    return DecimalData.fromBigDecimal(
-        valAsDecimal,
-        ((DecimalType) decimalType).getPrecision(),
-        ((DecimalType) decimalType).getScale());
-  }
-
-  /**
-   * Fields {@link Cast#from} and {@link Cast#to} are redundant due to {@link 
Cast#convert(Object)} determines conversion.
-   * However, it is convenient to debug {@link CastMap} when {@link 
Cast#toString()} prints types.
-   */
-  private static final class Cast implements Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private final LogicalType from;
-    private final LogicalType to;
-    private final Function<Object, Object> conversion;
-
-    Cast(LogicalType from, LogicalType to, Function<Object, Object> 
conversion) {
-      this.from = from;
-      this.to = to;
-      this.conversion = conversion;
-    }
-
-    Object convert(Object val) {
-      return conversion.apply(val);
-    }
-
-    @Override
-    public String toString() {
-      return from + " => " + to;
-    }
-  }
-
-  @Override
-  public String toString() {
-    return castMap.entrySet().stream()
-        .map(e -> e.getKey() + ": " + e.getValue())
-        .collect(Collectors.joining(", ", "{", "}"));
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 49cb3cec5bf..6357b898d49 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -32,7 +32,6 @@ import 
org.apache.hudi.common.util.queue.FunctionBasedQueueProducer;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.hadoop.config.HoodieRealtimeConfig;
-import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.util.FlinkWriteClients;
 import org.apache.hudi.util.StreamerUtil;
@@ -171,33 +170,6 @@ public class FormatUtils {
         .build();
   }
 
-  public static HoodieMergedLogRecordScanner logScanner(
-      MergeOnReadInputSplit split,
-      Schema logSchema,
-      InternalSchema internalSchema,
-      org.apache.flink.configuration.Configuration flinkConf,
-      Configuration hadoopConf) {
-    HoodieWriteConfig writeConfig = 
FlinkWriteClients.getHoodieClientConfig(flinkConf);
-    FileSystem fs = FSUtils.getFs(split.getTablePath(), hadoopConf);
-    return HoodieMergedLogRecordScanner.newBuilder()
-        .withFileSystem(fs)
-        .withBasePath(split.getTablePath())
-        .withLogFilePaths(split.getLogPaths().get())
-        .withReaderSchema(logSchema)
-        .withInternalSchema(internalSchema)
-        .withLatestInstantTime(split.getLatestCommit())
-        .withReadBlocksLazily(writeConfig.getCompactionLazyBlockReadEnabled())
-        .withReverseReader(false)
-        .withBufferSize(writeConfig.getMaxDFSStreamBufferSize())
-        .withMaxMemorySizeInBytes(split.getMaxCompactionMemoryInBytes())
-        
.withDiskMapType(writeConfig.getCommonConfig().getSpillableDiskMapType())
-        
.withBitCaskDiskMapCompressionEnabled(writeConfig.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
-        .withSpillableMapBasePath(writeConfig.getSpillableMapBasePath())
-        .withInstantRange(split.getInstantRange())
-        
.withOperationField(flinkConf.getBoolean(FlinkOptions.CHANGELOG_ENABLED))
-        .build();
-  }
-
   /**
    * Utility to read and buffer the records in the unMerged log record scanner.
    */
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
deleted file mode 100644
index abd405469d8..00000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.InternalSchemaCache;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.configuration.HadoopConfigurations;
-import org.apache.hudi.configuration.OptionsResolver;
-import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.internal.schema.Type;
-import org.apache.hudi.internal.schema.Types;
-import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
-import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
-import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
-import org.apache.hudi.util.AvroSchemaConverter;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.util.Preconditions;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-/**
- * This class is responsible for calculating names and types of fields that 
are actual at a certain point in time.
- * If field is renamed in queried schema, its old name will be returned, which 
is relevant at the provided time.
- * If type of field is changed, its old type will be returned, and projection 
will be created that will convert the old type to the queried one.
- */
-public class InternalSchemaManager implements Serializable {
-
-  private static final long serialVersionUID = 1L;
-
-  public static final InternalSchemaManager DISABLED = new 
InternalSchemaManager(null, InternalSchema.getEmptyInternalSchema(), null, 
null);
-
-  private final Configuration conf;
-  private final InternalSchema querySchema;
-  private final String validCommits;
-  private final String tablePath;
-  private transient org.apache.hadoop.conf.Configuration hadoopConf;
-
-  public static InternalSchemaManager get(Configuration conf, 
HoodieTableMetaClient metaClient) {
-    if (!OptionsResolver.isSchemaEvolutionEnabled(conf)) {
-      return DISABLED;
-    }
-    Option<InternalSchema> internalSchema = new 
TableSchemaResolver(metaClient).getTableInternalSchemaFromCommitMetadata();
-    if (!internalSchema.isPresent() || internalSchema.get().isEmptySchema()) {
-      return DISABLED;
-    }
-    String validCommits = metaClient
-        .getCommitsAndCompactionTimeline()
-        .filterCompletedInstants()
-        .getInstantsAsStream()
-        .map(HoodieInstant::getFileName)
-        .collect(Collectors.joining(","));
-    return new InternalSchemaManager(conf, internalSchema.get(), validCommits, 
metaClient.getBasePathV2().toString());
-  }
-
-  public InternalSchemaManager(Configuration conf, InternalSchema querySchema, 
String validCommits, String tablePath) {
-    this.conf = conf;
-    this.querySchema = querySchema;
-    this.validCommits = validCommits;
-    this.tablePath = tablePath;
-  }
-
-  public InternalSchema getQuerySchema() {
-    return querySchema;
-  }
-
-  InternalSchema getFileSchema(String fileName) {
-    if (querySchema.isEmptySchema()) {
-      return querySchema;
-    }
-    long commitInstantTime = Long.parseLong(FSUtils.getCommitTime(fileName));
-    InternalSchema fileSchemaUnmerged = 
InternalSchemaCache.getInternalSchemaByVersionId(
-        commitInstantTime, tablePath, getHadoopConf(), validCommits);
-    if (querySchema.equals(fileSchemaUnmerged)) {
-      return InternalSchema.getEmptyInternalSchema();
-    }
-    return new InternalSchemaMerger(fileSchemaUnmerged, querySchema, true, 
true).mergeSchema();
-  }
-
-  CastMap getCastMap(InternalSchema fileSchema, String[] queryFieldNames, 
DataType[] queryFieldTypes, int[] selectedFields) {
-    Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema 
cannot be empty");
-    Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema 
cannot be empty");
-
-    CastMap castMap = new CastMap();
-    Map<Integer, Integer> posProxy = getPosProxy(fileSchema, queryFieldNames);
-    if (posProxy.isEmpty()) {
-      castMap.setFileFieldTypes(queryFieldTypes);
-      return castMap;
-    }
-    List<Integer> selectedFieldList = 
IntStream.of(selectedFields).boxed().collect(Collectors.toList());
-    List<DataType> fileSchemaAsDataTypes = 
AvroSchemaConverter.convertToDataType(
-        AvroInternalSchemaConverter.convert(fileSchema, 
"tableName")).getChildren();
-    DataType[] fileFieldTypes = new DataType[queryFieldTypes.length];
-    for (int i = 0; i < queryFieldTypes.length; i++) {
-      Integer posOfChangedType = posProxy.get(i);
-      if (posOfChangedType == null) {
-        fileFieldTypes[i] = queryFieldTypes[i];
-      } else {
-        DataType fileType = fileSchemaAsDataTypes.get(posOfChangedType);
-        fileFieldTypes[i] = fileType;
-        int selectedPos = selectedFieldList.indexOf(i);
-        if (selectedPos != -1) {
-          castMap.add(selectedPos, fileType.getLogicalType(), 
queryFieldTypes[i].getLogicalType());
-        }
-      }
-    }
-    castMap.setFileFieldTypes(fileFieldTypes);
-    return castMap;
-  }
-
-  String[] getFileFieldNames(InternalSchema fileSchema, String[] 
queryFieldNames) {
-    Preconditions.checkArgument(!querySchema.isEmptySchema(), "querySchema 
cannot be empty");
-    Preconditions.checkArgument(!fileSchema.isEmptySchema(), "fileSchema 
cannot be empty");
-
-    Map<String, String> renamedCols = 
InternalSchemaUtils.collectRenameCols(fileSchema, querySchema);
-    if (renamedCols.isEmpty()) {
-      return queryFieldNames;
-    }
-    return Arrays.stream(queryFieldNames).map(name -> 
renamedCols.getOrDefault(name, name)).toArray(String[]::new);
-  }
-
-  private Map<Integer, Integer> getPosProxy(InternalSchema fileSchema, 
String[] queryFieldNames) {
-    Map<Integer, Pair<Type, Type>> changedCols = 
InternalSchemaUtils.collectTypeChangedCols(querySchema, fileSchema);
-    HashMap<Integer, Integer> posProxy = new HashMap<>(changedCols.size());
-    List<String> fieldNameList = Arrays.asList(queryFieldNames);
-    List<Types.Field> columns = querySchema.columns();
-    changedCols.forEach((posInSchema, typePair) -> {
-      String name = columns.get(posInSchema).name();
-      int posInType = fieldNameList.indexOf(name);
-      posProxy.put(posInType, posInSchema);
-    });
-    return Collections.unmodifiableMap(posProxy);
-  }
-
-  private org.apache.hadoop.conf.Configuration getHadoopConf() {
-    if (hadoopConf == null) {
-      hadoopConf = HadoopConfigurations.getHadoopConf(conf);
-    }
-    return hadoopConf;
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
deleted file mode 100644
index 7b26d71f115..00000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/ParquetSplitRecordIterator.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format;
-
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.exception.HoodieIOException;
-import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
-
-import org.apache.flink.table.data.RowData;
-
-import java.io.IOException;
-
-/**
- * Hoodie wrapper for flink parquet reader.
- */
-public final class ParquetSplitRecordIterator implements 
ClosableIterator<RowData> {
-  private final ParquetColumnarRowSplitReader reader;
-
-  public ParquetSplitRecordIterator(ParquetColumnarRowSplitReader reader) {
-    this.reader = reader;
-  }
-
-  @Override
-  public boolean hasNext() {
-    try {
-      return !reader.reachedEnd();
-    } catch (IOException e) {
-      throw new HoodieIOException("Decides whether the parquet columnar row 
split reader reached end exception", e);
-    }
-  }
-
-  @Override
-  public RowData next() {
-    return reader.nextRecord();
-  }
-
-  @Override
-  public void close() {
-    try {
-      reader.close();
-    } catch (IOException e) {
-      throw new HoodieIOException("Close the parquet columnar row split reader 
exception", e);
-    }
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
deleted file mode 100644
index 8657f16ddc9..00000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/RecordIterators.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format;
-
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.common.util.Option;
-import org.apache.hudi.internal.schema.InternalSchema;
-import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
-import org.apache.hudi.util.RowDataProjection;
-
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.DataType;
-import org.apache.hadoop.conf.Configuration;
-
-import java.io.IOException;
-import java.util.Map;
-
-/**
- * Factory clazz for record iterators.
- */
-public abstract class RecordIterators {
-
-  public static ClosableIterator<RowData> getParquetRecordIterator(
-      InternalSchemaManager internalSchemaManager,
-      boolean utcTimestamp,
-      boolean caseSensitive,
-      Configuration conf,
-      String[] fieldNames,
-      DataType[] fieldTypes,
-      Map<String, Object> partitionSpec,
-      int[] selectedFields,
-      int batchSize,
-      Path path,
-      long splitStart,
-      long splitLength) throws IOException {
-    InternalSchema fileSchema = 
internalSchemaManager.getFileSchema(path.getName());
-    if (fileSchema.isEmptySchema()) {
-      return new ParquetSplitRecordIterator(
-          ParquetSplitReaderUtil.genPartColumnarRowReader(
-              utcTimestamp,
-              caseSensitive,
-              conf,
-              fieldNames,
-              fieldTypes,
-              partitionSpec,
-              selectedFields,
-              batchSize,
-              path,
-              splitStart,
-              splitLength));
-    } else {
-      CastMap castMap = internalSchemaManager.getCastMap(fileSchema, 
fieldNames, fieldTypes, selectedFields);
-      Option<RowDataProjection> castProjection = 
castMap.toRowDataProjection(selectedFields);
-      ClosableIterator<RowData> itr = new ParquetSplitRecordIterator(
-          ParquetSplitReaderUtil.genPartColumnarRowReader(
-              utcTimestamp,
-              caseSensitive,
-              conf,
-              internalSchemaManager.getFileFieldNames(fileSchema, fieldNames), 
// the reconciled field names
-              castMap.getFileFieldTypes(),                                     
// the reconciled field types
-              partitionSpec,
-              selectedFields,
-              batchSize,
-              path,
-              splitStart,
-              splitLength));
-      if (castProjection.isPresent()) {
-        return new SchemaEvolvedRecordIterator(itr, castProjection.get());
-      } else {
-        return itr;
-      }
-    }
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
deleted file mode 100644
index 739512c7b55..00000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/SchemaEvolvedRecordIterator.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format;
-
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.util.RowDataProjection;
-
-import org.apache.flink.table.data.RowData;
-
-/**
- * Decorates origin record iterator with cast projection.
- */
-public final class SchemaEvolvedRecordIterator implements 
ClosableIterator<RowData> {
-  private final ClosableIterator<RowData> nested;
-  private final RowDataProjection castProjection;
-
-  public SchemaEvolvedRecordIterator(ClosableIterator<RowData> nested, 
RowDataProjection castProjection) {
-    this.nested = nested;
-    this.castProjection = castProjection;
-  }
-
-  @Override
-  public boolean hasNext() {
-    return nested.hasNext();
-  }
-
-  @Override
-  public RowData next() {
-    return castProjection.project(nested.next());
-  }
-
-  @Override
-  public void close() {
-    nested.close();
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
index 820424549f0..c5ea3d4ab98 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cow/CopyOnWriteInputFormat.java
@@ -20,9 +20,7 @@ package org.apache.hudi.table.format.cow;
 
 import java.util.Comparator;
 import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.util.ClosableIterator;
-import org.apache.hudi.table.format.InternalSchemaManager;
-import org.apache.hudi.table.format.RecordIterators;
+import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
 import org.apache.hudi.util.DataTypeUtils;
 
 import org.apache.flink.api.common.io.FileInputFormat;
@@ -76,7 +74,7 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
   private final SerializableConfiguration conf;
   private final long limit;
 
-  private transient ClosableIterator<RowData> itr;
+  private transient ParquetColumnarRowSplitReader reader;
   private transient long currentReadCount;
 
   /**
@@ -84,8 +82,6 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
    */
   private FilePathFilter localFilesFilter = new GlobFilePathFilter();
 
-  private final InternalSchemaManager internalSchemaManager;
-
   public CopyOnWriteInputFormat(
       Path[] paths,
       String[] fullFieldNames,
@@ -94,8 +90,7 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
       String partDefaultName,
       long limit,
       Configuration conf,
-      boolean utcTimestamp,
-      InternalSchemaManager internalSchemaManager) {
+      boolean utcTimestamp) {
     super.setFilePaths(paths);
     this.limit = limit;
     this.partDefaultName = partDefaultName;
@@ -104,7 +99,6 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
     this.selectedFields = selectedFields;
     this.conf = new SerializableConfiguration(conf);
     this.utcTimestamp = utcTimestamp;
-    this.internalSchemaManager = internalSchemaManager;
   }
 
   @Override
@@ -129,8 +123,7 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
       }
     });
 
-    this.itr = RecordIterators.getParquetRecordIterator(
-        internalSchemaManager,
+    this.reader = ParquetSplitReaderUtil.genPartColumnarRowReader(
         utcTimestamp,
         true,
         conf.conf(),
@@ -277,26 +270,26 @@ public class CopyOnWriteInputFormat extends 
FileInputFormat<RowData> {
   }
 
   @Override
-  public boolean reachedEnd() {
+  public boolean reachedEnd() throws IOException {
     if (currentReadCount >= limit) {
       return true;
     } else {
-      return !itr.hasNext();
+      return reader.reachedEnd();
     }
   }
 
   @Override
   public RowData nextRecord(RowData reuse) {
     currentReadCount++;
-    return itr.next();
+    return reader.nextRecord();
   }
 
   @Override
   public void close() throws IOException {
-    if (itr != null) {
-      this.itr.close();
+    if (reader != null) {
+      this.reader.close();
     }
-    this.itr = null;
+    this.reader = null;
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index e1e7025ff07..c9b6561bdef 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -29,13 +29,11 @@ import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieIOException;
-import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.keygen.KeyGenUtils;
 import org.apache.hudi.table.format.FilePathUtils;
 import org.apache.hudi.table.format.FormatUtils;
-import org.apache.hudi.table.format.InternalSchemaManager;
-import org.apache.hudi.table.format.RecordIterators;
+import org.apache.hudi.table.format.cow.ParquetSplitReaderUtil;
+import 
org.apache.hudi.table.format.cow.vector.reader.ParquetColumnarRowSplitReader;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.DataTypeUtils;
 import org.apache.hudi.util.RowDataProjection;
@@ -68,7 +66,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
-import java.util.function.Function;
 import java.util.stream.IntStream;
 
 import static 
org.apache.hudi.hadoop.utils.HoodieInputFormatUtils.HOODIE_COMMIT_TIME_COL_POS;
@@ -95,7 +92,7 @@ public class MergeOnReadInputFormat
   /**
    * Uniform iterator view for the underneath records.
    */
-  private transient ClosableIterator<RowData> iterator;
+  private transient RecordIterator iterator;
 
   // for project push down
   /**
@@ -140,16 +137,13 @@ public class MergeOnReadInputFormat
    */
   private boolean closed = true;
 
-  private final InternalSchemaManager internalSchemaManager;
-
   private MergeOnReadInputFormat(
       Configuration conf,
       MergeOnReadTableState tableState,
       List<DataType> fieldTypes,
       String defaultPartName,
       long limit,
-      boolean emitDelete,
-      InternalSchemaManager internalSchemaManager) {
+      boolean emitDelete) {
     this.conf = conf;
     this.tableState = tableState;
     this.fieldNames = tableState.getRowType().getFieldNames();
@@ -160,7 +154,6 @@ public class MergeOnReadInputFormat
     this.requiredPos = tableState.getRequiredPositions();
     this.limit = limit;
     this.emitDelete = emitDelete;
-    this.internalSchemaManager = internalSchemaManager;
   }
 
   /**
@@ -175,35 +168,30 @@ public class MergeOnReadInputFormat
     this.currentReadCount = 0L;
     this.closed = false;
     this.hadoopConf = HadoopConfigurations.getHadoopConf(this.conf);
-    this.iterator = initIterator(split);
-    mayShiftInputSplit(split);
-  }
-
-  protected ClosableIterator<RowData> initIterator(MergeOnReadInputSplit 
split) throws IOException {
     if (!(split.getLogPaths().isPresent() && split.getLogPaths().get().size() 
> 0)) {
       if (split.getInstantRange() != null) {
         // base file only with commit time filtering
-        return new BaseFileOnlyFilteringIterator(
+        this.iterator = new BaseFileOnlyFilteringIterator(
             split.getInstantRange(),
             this.tableState.getRequiredRowType(),
-            getBaseFileIterator(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
+            getReader(split.getBasePath().get(), 
getRequiredPosWithCommitTime(this.requiredPos)));
       } else {
         // base file only
-        return getBaseFileIterator(split.getBasePath().get());
+        this.iterator = new 
BaseFileOnlyIterator(getRequiredSchemaReader(split.getBasePath().get()));
       }
     } else if (!split.getBasePath().isPresent()) {
       // log files only
       if (OptionsResolver.emitChangelog(conf)) {
-        return new LogFileOnlyIterator(getUnMergedLogFileIterator(split));
+        this.iterator = new 
LogFileOnlyIterator(getUnMergedLogFileIterator(split));
       } else {
-        return new LogFileOnlyIterator(getLogFileIterator(split));
+        this.iterator = new LogFileOnlyIterator(getLogFileIterator(split));
       }
     } else if (split.getMergeType().equals(FlinkOptions.REALTIME_SKIP_MERGE)) {
-      return new SkipMergeIterator(
-          getBaseFileIterator(split.getBasePath().get()),
+      this.iterator = new SkipMergeIterator(
+          getRequiredSchemaReader(split.getBasePath().get()),
           getLogFileIterator(split));
     } else if 
(split.getMergeType().equals(FlinkOptions.REALTIME_PAYLOAD_COMBINE)) {
-      return new MergeIterator(
+      this.iterator = new MergeIterator(
           conf,
           hadoopConf,
           split,
@@ -211,11 +199,10 @@ public class MergeOnReadInputFormat
           this.tableState.getRequiredRowType(),
           new Schema.Parser().parse(this.tableState.getAvroSchema()),
           new Schema.Parser().parse(this.tableState.getRequiredAvroSchema()),
-          internalSchemaManager.getQuerySchema(),
           this.requiredPos,
           this.emitDelete,
           this.tableState.getOperationPos(),
-          getBaseFileIteratorWithMetadata(split.getBasePath().get()));
+          getFullSchemaReader(split.getBasePath().get()));
     } else {
       throw new HoodieException("Unable to select an Iterator to read the 
Hoodie MOR File Split for "
           + "file path: " + split.getBasePath()
@@ -224,6 +211,7 @@ public class MergeOnReadInputFormat
           + "spark partition Index: " + split.getSplitNumber()
           + "merge type: " + split.getMergeType());
     }
+    mayShiftInputSplit(split);
   }
 
   @Override
@@ -254,14 +242,14 @@ public class MergeOnReadInputFormat
       return true;
     } else {
       // log file reaches end ?
-      return !this.iterator.hasNext();
+      return this.iterator.reachedEnd();
     }
   }
 
   @Override
   public RowData nextRecord(RowData o) {
     currentReadCount++;
-    return this.iterator.next();
+    return this.iterator.nextRecord();
   }
 
   @Override
@@ -296,19 +284,15 @@ public class MergeOnReadInputFormat
     }
   }
 
-  protected ClosableIterator<RowData> getBaseFileIteratorWithMetadata(String 
path) {
-    try {
-      return getBaseFileIterator(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
-    } catch (IOException e) {
-      throw new HoodieException("Get reader error for path: " + path);
-    }
+  private ParquetColumnarRowSplitReader getFullSchemaReader(String path) 
throws IOException {
+    return getReader(path, IntStream.range(0, 
this.tableState.getRowType().getFieldCount()).toArray());
   }
 
-  protected ClosableIterator<RowData> getBaseFileIterator(String path) throws 
IOException {
-    return getBaseFileIterator(path, this.requiredPos);
+  private ParquetColumnarRowSplitReader getRequiredSchemaReader(String path) 
throws IOException {
+    return getReader(path, this.requiredPos);
   }
 
-  private ClosableIterator<RowData> getBaseFileIterator(String path, int[] 
requiredPos) throws IOException {
+  private ParquetColumnarRowSplitReader getReader(String path, int[] 
requiredPos) throws IOException {
     // generate partition specs.
     LinkedHashMap<String, String> partSpec = 
FilePathUtils.extractPartitionKeyValues(
         new org.apache.hadoop.fs.Path(path).getParent(),
@@ -330,8 +314,7 @@ public class MergeOnReadInputFormat
       }
     });
 
-    return RecordIterators.getParquetRecordIterator(
-        internalSchemaManager,
+    return ParquetSplitReaderUtil.genPartColumnarRowReader(
         this.conf.getBoolean(FlinkOptions.UTC_TIMEZONE),
         true,
         HadoopConfigurations.getParquetConf(this.conf, hadoopConf),
@@ -477,12 +460,46 @@ public class MergeOnReadInputFormat
   // -------------------------------------------------------------------------
   //  Inner Class
   // -------------------------------------------------------------------------
+  private interface RecordIterator {
+    boolean reachedEnd() throws IOException;
+
+    RowData nextRecord();
+
+    void close() throws IOException;
+  }
+
+  static class BaseFileOnlyIterator implements RecordIterator {
+    // base file reader
+    private final ParquetColumnarRowSplitReader reader;
+
+    BaseFileOnlyIterator(ParquetColumnarRowSplitReader reader) {
+      this.reader = reader;
+    }
+
+    @Override
+    public boolean reachedEnd() throws IOException {
+      return this.reader.reachedEnd();
+    }
+
+    @Override
+    public RowData nextRecord() {
+      return this.reader.nextRecord();
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (this.reader != null) {
+        this.reader.close();
+      }
+    }
+  }
+
   /**
-   * Base record iterator with instant time filtering.
+   * Similar with {@link BaseFileOnlyIterator} but with instant time filtering.
    */
-  static class BaseFileOnlyFilteringIterator implements 
ClosableIterator<RowData> {
-    // base file record iterator
-    private final ClosableIterator<RowData> nested;
+  static class BaseFileOnlyFilteringIterator implements RecordIterator {
+    // base file reader
+    private final ParquetColumnarRowSplitReader reader;
     private final InstantRange instantRange;
     private final RowDataProjection projection;
 
@@ -491,44 +508,44 @@ public class MergeOnReadInputFormat
     BaseFileOnlyFilteringIterator(
         Option<InstantRange> instantRange,
         RowType requiredRowType,
-        ClosableIterator<RowData> nested) {
-      this.nested = nested;
+        ParquetColumnarRowSplitReader reader) {
+      this.reader = reader;
       this.instantRange = instantRange.orElse(null);
       int[] positions = IntStream.range(1, 1 + 
requiredRowType.getFieldCount()).toArray();
       projection = RowDataProjection.instance(requiredRowType, positions);
     }
 
     @Override
-    public boolean hasNext() {
-      while (this.nested.hasNext()) {
-        currentRecord = this.nested.next();
+    public boolean reachedEnd() throws IOException {
+      while (!this.reader.reachedEnd()) {
+        currentRecord = this.reader.nextRecord();
         if (instantRange != null) {
           boolean isInRange = 
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
           if (isInRange) {
-            return true;
+            return false;
           }
         } else {
-          return true;
+          return false;
         }
       }
-      return false;
+      return true;
     }
 
     @Override
-    public RowData next() {
+    public RowData nextRecord() {
       // can promote: no need to project with null instant range
       return projection.project(currentRecord);
     }
 
     @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
+    public void close() throws IOException {
+      if (this.reader != null) {
+        this.reader.close();
       }
     }
   }
 
-  protected static class LogFileOnlyIterator implements 
ClosableIterator<RowData> {
+  static class LogFileOnlyIterator implements RecordIterator {
     // iterator for log files
     private final ClosableIterator<RowData> iterator;
 
@@ -537,12 +554,12 @@ public class MergeOnReadInputFormat
     }
 
     @Override
-    public boolean hasNext() {
-      return this.iterator.hasNext();
+    public boolean reachedEnd() {
+      return !this.iterator.hasNext();
     }
 
     @Override
-    public RowData next() {
+    public RowData nextRecord() {
       return this.iterator.next();
     }
 
@@ -554,9 +571,9 @@ public class MergeOnReadInputFormat
     }
   }
 
-  static class SkipMergeIterator implements ClosableIterator<RowData> {
-    // base file record iterator
-    private final ClosableIterator<RowData> nested;
+  static class SkipMergeIterator implements RecordIterator {
+    // base file reader
+    private final ParquetColumnarRowSplitReader reader;
     // iterator for log files
     private final ClosableIterator<RowData> iterator;
 
@@ -567,34 +584,34 @@ public class MergeOnReadInputFormat
 
     private RowData currentRecord;
 
-    SkipMergeIterator(ClosableIterator<RowData> nested, 
ClosableIterator<RowData> iterator) {
-      this.nested = nested;
+    SkipMergeIterator(ParquetColumnarRowSplitReader reader, 
ClosableIterator<RowData> iterator) {
+      this.reader = reader;
       this.iterator = iterator;
     }
 
     @Override
-    public boolean hasNext() {
-      if (!readLogs && this.nested.hasNext()) {
-        currentRecord = this.nested.next();
-        return true;
+    public boolean reachedEnd() throws IOException {
+      if (!readLogs && !this.reader.reachedEnd()) {
+        currentRecord = this.reader.nextRecord();
+        return false;
       }
       readLogs = true;
       if (this.iterator.hasNext()) {
         currentRecord = this.iterator.next();
-        return true;
+        return false;
       }
-      return false;
+      return true;
     }
 
     @Override
-    public RowData next() {
+    public RowData nextRecord() {
       return currentRecord;
     }
 
     @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
+    public void close() throws IOException {
+      if (this.reader != null) {
+        this.reader.close();
       }
       if (this.iterator != null) {
         this.iterator.close();
@@ -602,22 +619,24 @@ public class MergeOnReadInputFormat
     }
   }
 
-  protected static class MergeIterator implements ClosableIterator<RowData> {
-    // base file record iterator
-    private final ClosableIterator<RowData> nested;
+  static class MergeIterator implements RecordIterator {
+    // base file reader
+    private final ParquetColumnarRowSplitReader reader;
     // log keys used for merging
     private final Iterator<String> logKeysIterator;
     // scanner
     private final HoodieMergedLogRecordScanner scanner;
 
     private final Schema tableSchema;
+    private final Schema requiredSchema;
+    private final int[] requiredPos;
     private final boolean emitDelete;
     private final int operationPos;
     private final RowDataToAvroConverters.RowDataToAvroConverter 
rowDataToAvroConverter;
     private final AvroToRowDataConverters.AvroToRowDataConverter 
avroToRowDataConverter;
+    private final GenericRecordBuilder recordBuilder;
 
-    private final Option<RowDataProjection> projection;
-    private final Option<Function<IndexedRecord, GenericRecord>> 
avroProjection;
+    private final RowDataProjection projection;
 
     private final InstantRange instantRange;
 
@@ -640,49 +659,30 @@ public class MergeOnReadInputFormat
         RowType requiredRowType,
         Schema tableSchema,
         Schema requiredSchema,
-        InternalSchema querySchema,
         int[] requiredPos,
         boolean emitDelete,
         int operationPos,
-        ClosableIterator<RowData> nested) { // the iterator should be with 
full schema
-      this(flinkConf, hadoopConf, split, tableRowType, requiredRowType, 
tableSchema,
-          querySchema,
-          Option.of(RowDataProjection.instance(requiredRowType, requiredPos)),
-          Option.of(record -> buildAvroRecordBySchema(record, requiredSchema, 
requiredPos, new GenericRecordBuilder(requiredSchema))),
-          emitDelete, operationPos, nested);
-    }
-
-    public MergeIterator(
-        Configuration flinkConf,
-        org.apache.hadoop.conf.Configuration hadoopConf,
-        MergeOnReadInputSplit split,
-        RowType tableRowType,
-        RowType requiredRowType,
-        Schema tableSchema,
-        InternalSchema querySchema,
-        Option<RowDataProjection> projection,
-        Option<Function<IndexedRecord, GenericRecord>> avroProjection,
-        boolean emitDelete,
-        int operationPos,
-        ClosableIterator<RowData> nested) { // the iterator should be with 
full schema
+        ParquetColumnarRowSplitReader reader) { // the reader should be with 
full schema
       this.tableSchema = tableSchema;
-      this.nested = nested;
-      this.scanner = FormatUtils.logScanner(split, tableSchema, querySchema, 
flinkConf, hadoopConf);
+      this.reader = reader;
+      this.scanner = FormatUtils.logScanner(split, tableSchema, flinkConf, 
hadoopConf);
       this.payloadProps = StreamerUtil.getPayloadConfig(flinkConf).getProps();
       this.logKeysIterator = scanner.getRecords().keySet().iterator();
+      this.requiredSchema = requiredSchema;
+      this.requiredPos = requiredPos;
       this.emitDelete = emitDelete;
       this.operationPos = operationPos;
-      this.avroProjection = avroProjection;
+      this.recordBuilder = new GenericRecordBuilder(requiredSchema);
       this.rowDataToAvroConverter = 
RowDataToAvroConverters.createConverter(tableRowType);
       this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(requiredRowType);
-      this.projection = projection;
+      this.projection = RowDataProjection.instance(requiredRowType, 
requiredPos);
       this.instantRange = split.getInstantRange().orElse(null);
     }
 
     @Override
-    public boolean hasNext() {
-      while (!readLogs && this.nested.hasNext()) {
-        currentRecord = this.nested.next();
+    public boolean reachedEnd() throws IOException {
+      while (!readLogs && !this.reader.reachedEnd()) {
+        currentRecord = this.reader.nextRecord();
         if (instantRange != null) {
           boolean isInRange = 
instantRange.isInRange(currentRecord.getString(HOODIE_COMMIT_TIME_COL_POS).toString());
           if (!isInRange) {
@@ -703,19 +703,19 @@ public class MergeOnReadInputFormat
               // deleted
               continue;
             }
-            IndexedRecord avroRecord = avroProjection.isPresent()
-                ? avroProjection.get().apply(mergedAvroRecord.get())
-                : mergedAvroRecord.get();
+            GenericRecord avroRecord = buildAvroRecordBySchema(
+                mergedAvroRecord.get(),
+                requiredSchema,
+                requiredPos,
+                recordBuilder);
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             this.currentRecord.setRowKind(rowKind);
-            return true;
+            return false;
           }
         }
         // project the full record in base with required positions
-        if (projection.isPresent()) {
-          currentRecord = projection.get().project(currentRecord);
-        }
-        return true;
+        currentRecord = projection.project(currentRecord);
+        return false;
       }
       // read the logs
       readLogs = true;
@@ -725,53 +725,49 @@ public class MergeOnReadInputFormat
           Option<IndexedRecord> insertAvroRecord = getInsertValue(curKey);
           if (insertAvroRecord.isPresent()) {
             // the record is a DELETE if insertAvroRecord not present, skipping
-            IndexedRecord avroRecord = avroProjection.isPresent()
-                ? avroProjection.get().apply(insertAvroRecord.get())
-                : insertAvroRecord.get();
+            GenericRecord avroRecord = buildAvroRecordBySchema(
+                insertAvroRecord.get(),
+                requiredSchema,
+                requiredPos,
+                recordBuilder);
             this.currentRecord = (RowData) 
avroToRowDataConverter.convert(avroRecord);
             FormatUtils.setRowKind(this.currentRecord, insertAvroRecord.get(), 
this.operationPos);
-            return true;
+            return false;
           }
         }
       }
-      return false;
+      return true;
     }
 
-    private Option<IndexedRecord> getInsertValue(String curKey) {
+    private Option<IndexedRecord> getInsertValue(String curKey) throws 
IOException {
       final HoodieAvroRecord<?> record = (HoodieAvroRecord) 
scanner.getRecords().get(curKey);
       if (!emitDelete && HoodieOperation.isDelete(record.getOperation())) {
         return Option.empty();
       }
-      try {
-        return record.getData().getInsertValue(tableSchema);
-      } catch (IOException e) {
-        throw new HoodieIOException("Get insert value from payload exception", 
e);
-      }
+      return record.getData().getInsertValue(tableSchema);
     }
 
     @Override
-    public RowData next() {
+    public RowData nextRecord() {
       return currentRecord;
     }
 
     @Override
-    public void close() {
-      if (this.nested != null) {
-        this.nested.close();
+    public void close() throws IOException {
+      if (this.reader != null) {
+        this.reader.close();
       }
       if (this.scanner != null) {
         this.scanner.close();
       }
     }
 
-    private Option<IndexedRecord> mergeRowWithLog(RowData curRow, String 
curKey) {
+    private Option<IndexedRecord> mergeRowWithLog(
+        RowData curRow,
+        String curKey) throws IOException {
       final HoodieAvroRecord<?> record = (HoodieAvroRecord) 
scanner.getRecords().get(curKey);
       GenericRecord historyAvroRecord = (GenericRecord) 
rowDataToAvroConverter.convert(tableSchema, curRow);
-      try {
-        return record.getData().combineAndGetUpdateValue(historyAvroRecord, 
tableSchema, payloadProps);
-      } catch (IOException e) {
-        throw new HoodieIOException("Merge base and delta payloads exception", 
e);
-      }
+      return record.getData().combineAndGetUpdateValue(historyAvroRecord, 
tableSchema, payloadProps);
     }
   }
 
@@ -779,13 +775,12 @@ public class MergeOnReadInputFormat
    * Builder for {@link MergeOnReadInputFormat}.
    */
   public static class Builder {
-    protected Configuration conf;
-    protected MergeOnReadTableState tableState;
-    protected List<DataType> fieldTypes;
-    protected String defaultPartName;
-    protected long limit = -1;
-    protected boolean emitDelete = false;
-    protected InternalSchemaManager internalSchemaManager = 
InternalSchemaManager.DISABLED;
+    private Configuration conf;
+    private MergeOnReadTableState tableState;
+    private List<DataType> fieldTypes;
+    private String defaultPartName;
+    private long limit = -1;
+    private boolean emitDelete = false;
 
     public Builder config(Configuration conf) {
       this.conf = conf;
@@ -817,14 +812,9 @@ public class MergeOnReadInputFormat
       return this;
     }
 
-    public Builder internalSchemaManager(InternalSchemaManager 
internalSchemaManager) {
-      this.internalSchemaManager = internalSchemaManager;
-      return this;
-    }
-
     public MergeOnReadInputFormat build() {
       return new MergeOnReadInputFormat(conf, tableState, fieldTypes,
-          defaultPartName, limit, emitDelete, internalSchemaManager);
+          defaultPartName, limit, emitDelete);
     }
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java
deleted file mode 100644
index 55e85aa1f60..00000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataCastProjection.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.util;
-
-import org.apache.hudi.table.format.CastMap;
-
-import org.apache.flink.table.types.logical.LogicalType;
-
-import javax.annotation.Nullable;
-import java.util.stream.IntStream;
-
-/**
- * This class is responsible to project row as well as {@link 
RowDataProjection}.
- * In addition, fields are converted according to the CastMap.
- */
-public final class RowDataCastProjection extends RowDataProjection {
-  private static final long serialVersionUID = 1L;
-
-  private final CastMap castMap;
-
-  public RowDataCastProjection(LogicalType[] types, CastMap castMap) {
-    super(types, IntStream.range(0, types.length).toArray());
-    this.castMap = castMap;
-  }
-
-  @Override
-  protected @Nullable Object getVal(int pos, @Nullable Object val) {
-    if (val == null) {
-      return null;
-    }
-    return castMap.castIfNeeded(pos, val);
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
index 2e3e8a2ed32..8076d982b99 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/RowDataProjection.java
@@ -25,8 +25,6 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
@@ -39,7 +37,7 @@ public class RowDataProjection implements Serializable {
 
   private final RowData.FieldGetter[] fieldGetters;
 
-  protected RowDataProjection(LogicalType[] types, int[] positions) {
+  private RowDataProjection(LogicalType[] types, int[] positions) {
     ValidationUtils.checkArgument(types.length == positions.length,
         "types and positions should have the equal number");
     this.fieldGetters = new RowData.FieldGetter[types.length];
@@ -88,9 +86,4 @@ public class RowDataProjection implements Serializable {
     }
     return values;
   }
-
-  protected @Nullable
-  Object getVal(int pos, @Nullable Object val) {
-    return val;
-  }
 }

Reply via email to