This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ee485c234ff9 [HUDI-9705] Fix bugs in spark and avro reader contexts 
for type promotion and field renaming (#13714)
ee485c234ff9 is described below

commit ee485c234ff948311e9cb7defb7adb9a8a512def
Author: Tim Brown <[email protected]>
AuthorDate: Wed Aug 13 16:57:22 2025 -0400

    [HUDI-9705] Fix bugs in spark and avro reader contexts for type promotion 
and field renaming (#13714)
---
 .../parquet/HoodieParquetFileFormatHelper.scala    | 33 ++++++---
 .../apache/hudi/avro/HoodieAvroReaderContext.java  | 22 +++++-
 .../java/org/apache/hudi/avro/HoodieAvroUtils.java | 32 ++++-----
 .../table/read/FileGroupReaderSchemaHandler.java   | 25 ++++++-
 .../common/table/read/HoodieFileGroupReader.java   |  4 +-
 .../read/ParquetRowIndexBasedSchemaHandler.java    |  8 +--
 .../hudi/io/storage/HoodieAvroFileReader.java      |  8 ++-
 .../io/storage/HoodieNativeAvroHFileReader.java    |  5 +-
 .../hudi/metadata/HoodieBackedTableMetadata.java   |  3 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  2 +-
 .../org/apache/hudi/avro/TestHoodieAvroUtils.java  | 49 ++++++++++++++
 .../common/table/read/SchemaHandlerTestBase.java   | 26 +++++---
 .../read/TestFileGroupReaderSchemaHandler.java     | 78 ++++++++++++++++------
 .../TestParquetRowIndexBasedSchemaHandler.java     | 13 ++--
 .../buffer/TestKeyBasedFileGroupRecordBuffer.java  | 17 ++---
 .../TestSortedKeyBasedFileGroupRecordBuffer.java   |  4 +-
 ...TestStreamingKeyBasedFileGroupRecordBuffer.java | 17 ++---
 .../apache/hudi/io/hadoop/HoodieAvroOrcReader.java |  7 +-
 .../hudi/io/hadoop/HoodieAvroParquetReader.java    | 17 +++--
 .../io/hadoop/HoodieAvroParquetReaderIterator.java |  9 ++-
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala |  3 +-
 .../TestPositionBasedFileGroupRecordBuffer.java    |  4 +-
 22 files changed, 273 insertions(+), 113 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
index fa3731c2c57a..e3523b75ce67 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/HoodieParquetFileFormatHelper.scala
@@ -24,7 +24,7 @@ import org.apache.parquet.hadoop.metadata.FileMetaData
 import org.apache.spark.sql.HoodieSchemaUtils
 import 
org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
 import org.apache.spark.sql.catalyst.expressions.{ArrayTransform, Attribute, 
Cast, CreateNamedStruct, CreateStruct, Expression, GetStructField, 
LambdaFunction, Literal, MapEntries, MapFromEntries, NamedLambdaVariable, 
UnsafeProjection}
-import org.apache.spark.sql.types.{ArrayType, DataType, DoubleType, FloatType, 
MapType, StringType, StructField, StructType}
+import org.apache.spark.sql.types.{ArrayType, DataType, DateType, DecimalType, 
DoubleType, FloatType, IntegerType, LongType, MapType, StringType, StructField, 
StructType}
 
 object HoodieParquetFileFormatHelper {
 
@@ -104,18 +104,24 @@ object HoodieParquetFileFormatHelper {
                                requiredSchema: StructType,
                                partitionSchema: StructType,
                                schemaUtils: HoodieSchemaUtils): 
UnsafeProjection = {
-    val floatToDoubleCache = scala.collection.mutable.HashMap.empty[(DataType, 
DataType), Boolean]
+    val addedCastCache = scala.collection.mutable.HashMap.empty[(DataType, 
DataType), Boolean]
 
-    def hasFloatToDoubleConversion(src: DataType, dst: DataType): Boolean = {
-      floatToDoubleCache.getOrElseUpdate((src, dst), {
+    def hasUnsupportedConversion(src: DataType, dst: DataType): Boolean = {
+      addedCastCache.getOrElseUpdate((src, dst), {
         (src, dst) match {
           case (FloatType, DoubleType) => true
+          case (IntegerType, DecimalType()) => true
+          case (LongType, DecimalType()) => true
+          case (FloatType, DecimalType()) => true
+          case (DoubleType, DecimalType()) => true
+          case (StringType, DecimalType()) => true
+          case (StringType, DateType) => true
           case (StructType(srcFields), StructType(dstFields)) =>
-            srcFields.zip(dstFields).exists { case (sf, df) => 
hasFloatToDoubleConversion(sf.dataType, df.dataType) }
+            srcFields.zip(dstFields).exists { case (sf, df) => 
hasUnsupportedConversion(sf.dataType, df.dataType) }
           case (ArrayType(sElem, _), ArrayType(dElem, _)) =>
-            hasFloatToDoubleConversion(sElem, dElem)
+            hasUnsupportedConversion(sElem, dElem)
           case (MapType(sKey, sVal, _), MapType(dKey, dVal, _)) =>
-            hasFloatToDoubleConversion(sKey, dKey) || 
hasFloatToDoubleConversion(sVal, dVal)
+            hasUnsupportedConversion(sKey, dKey) || 
hasUnsupportedConversion(sVal, dVal)
           case _ => false
         }
       })
@@ -127,7 +133,14 @@ object HoodieParquetFileFormatHelper {
         case (FloatType, DoubleType) =>
           val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else 
None)
           Cast(toStr, dstType, if (needTimeZone) timeZoneId else None)
-        case (s: StructType, d: StructType) if hasFloatToDoubleConversion(s, 
d) =>
+        case (IntegerType | LongType | FloatType | DoubleType, dec: 
DecimalType) =>
+          val toStr = Cast(expr, StringType, if (needTimeZone) timeZoneId else 
None)
+          Cast(toStr, dec, if (needTimeZone) timeZoneId else None)
+        case (StringType, dec: DecimalType) =>
+          Cast(expr, dec, if (needTimeZone) timeZoneId else None)
+        case (StringType, DateType) =>
+          Cast(expr, DateType, if (needTimeZone) timeZoneId else None)
+        case (s: StructType, d: StructType) if hasUnsupportedConversion(s, d) 
=>
           val structFields = s.fields.zip(d.fields).zipWithIndex.map {
             case ((srcField, dstField), i) =>
               val child = GetStructField(expr, i, Some(dstField.name))
@@ -136,13 +149,13 @@ object HoodieParquetFileFormatHelper {
           CreateNamedStruct(d.fields.zip(structFields).flatMap {
             case (f, c) => Seq(Literal(f.name), c)
           })
-        case (ArrayType(sElementType, containsNull), ArrayType(dElementType, 
_)) if hasFloatToDoubleConversion(sElementType, dElementType) =>
+        case (ArrayType(sElementType, containsNull), ArrayType(dElementType, 
_)) if hasUnsupportedConversion(sElementType, dElementType) =>
           val lambdaVar = NamedLambdaVariable("element", sElementType, 
containsNull)
           val body = recursivelyCastExpressions(lambdaVar, sElementType, 
dElementType)
           val func = LambdaFunction(body, Seq(lambdaVar))
           ArrayTransform(expr, func)
         case (MapType(sKeyType, sValType, vnull), MapType(dKeyType, dValType, 
_))
-          if hasFloatToDoubleConversion(sKeyType, dKeyType) || 
hasFloatToDoubleConversion(sValType, dValType) =>
+          if hasUnsupportedConversion(sKeyType, dKeyType) || 
hasUnsupportedConversion(sValType, dValType) =>
           val kv = NamedLambdaVariable("kv", new StructType()
             .add("key", sKeyType, nullable = false)
             .add("value", sValType, nullable = vnull), nullable = false)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
index d966e9ffa788..fb17d8302236 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroReaderContext.java
@@ -23,7 +23,9 @@ import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieAvroRecordMerger;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteWithLatestMerger;
@@ -63,6 +65,7 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkState;
  */
 public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord> {
   private final Map<StoragePath, HoodieAvroFileReader> reusableFileReaders;
+  private final boolean isMultiFormat;
 
   /**
    * Constructs an instance of the reader context that will read data into 
Avro records.
@@ -124,6 +127,7 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
       boolean requiresPayloadRecords) {
     super(storageConfiguration, tableConfig, instantRangeOpt, filterOpt, new 
AvroRecordContext(tableConfig, payloadClassName, requiresPayloadRecords));
     this.reusableFileReaders = reusableFileReaders;
+    this.isMultiFormat = tableConfig.isMultipleBaseFileFormatsEnabled();
   }
 
   @Override
@@ -135,15 +139,27 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
       Schema requiredSchema,
       HoodieStorage storage) throws IOException {
     HoodieAvroFileReader reader;
+    boolean isLogFile = FSUtils.isLogFile(filePath);
+    Schema fileOutputSchema;
+    Map<String, String> renamedColumns;
+    if (isLogFile) {
+      fileOutputSchema = requiredSchema;
+      renamedColumns = Collections.emptyMap();
+    } else {
+      Pair<Schema, Map<String, String>> requiredSchemaForFileAndRenamedColumns 
= getSchemaHandler().getRequiredSchemaForFileAndRenamedColumns(filePath);
+      fileOutputSchema = requiredSchemaForFileAndRenamedColumns.getLeft();
+      renamedColumns = requiredSchemaForFileAndRenamedColumns.getRight();
+    }
     if (reusableFileReaders.containsKey(filePath)) {
       reader = reusableFileReaders.get(filePath);
     } else {
+      HoodieFileFormat fileFormat = isMultiFormat && !isLogFile ? 
HoodieFileFormat.fromFileExtension(filePath.getFileExtension()) : 
baseFileFormat;
       reader = (HoodieAvroFileReader) HoodieIOFactory.getIOFactory(storage)
           
.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO).getFileReader(new 
HoodieConfig(),
-              filePath, baseFileFormat, Option.empty());
+              filePath, fileFormat, Option.empty());
     }
     if (keyFilterOpt.isEmpty()) {
-      return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+      return reader.getIndexedRecordIterator(dataSchema, fileOutputSchema, 
renamedColumns);
     }
     if (reader.supportKeyPredicate()) {
       List<String> keys = reader.extractKeys(keyFilterOpt);
@@ -157,7 +173,7 @@ public class HoodieAvroReaderContext extends 
HoodieReaderContext<IndexedRecord>
         return reader.getIndexedRecordsByKeyPrefixIterator(keyPrefixes, 
requiredSchema);
       }
     }
-    return reader.getIndexedRecordIterator(dataSchema, requiredSchema);
+    return reader.getIndexedRecordIterator(dataSchema, fileOutputSchema, 
renamedColumns);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
index 0de9d4dd62f2..c74c6a698831 100644
--- a/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/avro/HoodieAvroUtils.java
@@ -1406,8 +1406,15 @@ public class HoodieAvroUtils {
     if (writerSchema.equals(readerSchema)) {
       return false;
     }
+    if (readerSchema.getLogicalType() != null) {
+      // check if logical types are equal
+      return 
!readerSchema.getLogicalType().equals(writerSchema.getLogicalType());
+    }
     switch (readerSchema.getType()) {
       case RECORD:
+        if (readerSchema.getFields().size() > writerSchema.getFields().size()) 
{
+          return true;
+        }
         for (Schema.Field field : readerSchema.getFields()) {
           Schema.Field writerField = writerSchema.getField(field.name());
           if (writerField == null || 
recordNeedsRewriteForExtendedAvroTypePromotion(writerField.schema(), 
field.schema())) {
@@ -1430,13 +1437,13 @@ public class HoodieAvroUtils {
       case ENUM:
         return needsRewriteToString(writerSchema, true);
       case STRING:
-      case BYTES:
         return needsRewriteToString(writerSchema, false);
       case DOUBLE:
-        // To maintain precision, you need to convert Float -> String -> Double
-        return writerSchema.getType().equals(Schema.Type.FLOAT);
+      case FLOAT:
+      case LONG:
+        return !(writerSchema.getType().equals(Schema.Type.INT) || 
writerSchema.getType().equals(Schema.Type.LONG));
       default:
-        return false;
+        return !writerSchema.getType().equals(readerSchema.getType());
     }
   }
 
@@ -1446,18 +1453,13 @@ public class HoodieAvroUtils {
    * string so some intervention is needed
    */
   private static boolean needsRewriteToString(Schema schema, boolean isEnum) {
-    switch (schema.getType()) {
-      case INT:
-      case LONG:
-      case FLOAT:
-      case DOUBLE:
-      case BYTES:
-        return true;
-      case ENUM:
-        return !isEnum;
-      default:
-        return false;
+    if (schema.getLogicalType() != null) {
+      return true;
+    }
+    if (schema.getType() == Schema.Type.ENUM) {
+      return !isEnum;
     }
+    return true;
   }
 
   /**
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
index 727077f4a52c..2404eb91fd0e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupReaderSchemaHandler.java
@@ -24,16 +24,21 @@ import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import 
org.apache.hudi.common.table.read.buffer.PositionBasedFileGroupRecordBuffer;
+import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
 import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 
@@ -76,22 +81,24 @@ public class FileGroupReaderSchemaHandler<T> {
 
   protected final TypedProperties properties;
   private final DeleteContext deleteContext;
+  private final HoodieTableMetaClient metaClient;
 
   public FileGroupReaderSchemaHandler(HoodieReaderContext<T> readerContext,
                                       Schema tableSchema,
                                       Schema requestedSchema,
                                       Option<InternalSchema> internalSchemaOpt,
-                                      HoodieTableConfig hoodieTableConfig,
-                                      TypedProperties properties) {
+                                      TypedProperties properties,
+                                      HoodieTableMetaClient metaClient) {
     this.properties = properties;
     this.readerContext = readerContext;
     this.tableSchema = tableSchema;
     this.requestedSchema = AvroSchemaCache.intern(requestedSchema);
-    this.hoodieTableConfig = hoodieTableConfig;
+    this.hoodieTableConfig = metaClient.getTableConfig();
     this.deleteContext = new DeleteContext(properties, tableSchema);
     this.requiredSchema = 
AvroSchemaCache.intern(prepareRequiredSchema(this.deleteContext));
     this.internalSchema = pruneInternalSchema(requiredSchema, 
internalSchemaOpt);
     this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
+    this.metaClient = metaClient;
   }
 
   public Schema getTableSchema() {
@@ -125,6 +132,18 @@ public class FileGroupReaderSchemaHandler<T> {
     return deleteContext;
   }
 
+  public Pair<Schema, Map<String, String>> 
getRequiredSchemaForFileAndRenamedColumns(StoragePath path) {
+    if (internalSchema.isEmptySchema()) {
+      return Pair.of(requiredSchema, Collections.emptyMap());
+    }
+    long commitInstantTime = 
Long.parseLong(FSUtils.getCommitTime(path.getName()));
+    InternalSchema fileSchema = 
InternalSchemaCache.searchSchemaAndCache(commitInstantTime, metaClient);
+    Pair<InternalSchema, Map<String, String>> mergedInternalSchema = new 
InternalSchemaMerger(fileSchema, internalSchema,
+        true, false, false).mergeSchemaGetRenamed();
+    Schema mergedAvroSchema = 
AvroSchemaCache.intern(AvroInternalSchemaConverter.convert(mergedInternalSchema.getLeft(),
 requiredSchema.getFullName()));
+    return Pair.of(mergedAvroSchema, mergedInternalSchema.getRight());
+  }
+
   private InternalSchema pruneInternalSchema(Schema requiredSchema, 
Option<InternalSchema> internalSchemaOption) {
     if (!internalSchemaOption.isPresent()) {
       return InternalSchema.getEmptyInternalSchema();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
index 54b8d2f63152..1a7294854f18 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReader.java
@@ -116,8 +116,8 @@ public final class HoodieFileGroupReader<T> implements 
Closeable {
     
readerContext.setShouldMergeUseRecordPosition(readerParameters.useRecordPosition()
 && !isSkipMerge && readerContext.getHasLogFiles() && 
inputSplit.isParquetBaseFile());
     
readerContext.setHasBootstrapBaseFile(inputSplit.getBaseFileOption().flatMap(HoodieBaseFile::getBootstrapBaseFile).isPresent());
     
readerContext.setSchemaHandler(readerContext.getRecordContext().supportsParquetRowIndex()
-        ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props)
-        : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, tableConfig, props));
+        ? new ParquetRowIndexBasedSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, props, metaClient)
+        : new FileGroupReaderSchemaHandler<>(readerContext, dataSchema, 
requestedSchema, internalSchemaOpt, props, metaClient));
     this.outputConverter = 
readerContext.getSchemaHandler().getOutputConverter();
     this.orderingFieldNames = 
HoodieRecordUtils.getOrderingFieldNames(readerContext.getMergeMode(), props, 
hoodieTableMetaClient);
     this.readStats = new HoodieReadStats();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java
index c365e09cab29..d0be7534884e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/ParquetRowIndexBasedSchemaHandler.java
@@ -21,7 +21,7 @@ package org.apache.hudi.common.table.read;
 
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
@@ -46,9 +46,9 @@ public class ParquetRowIndexBasedSchemaHandler<T> extends 
FileGroupReaderSchemaH
                                            Schema dataSchema,
                                            Schema requestedSchema,
                                            Option<InternalSchema> 
internalSchemaOpt,
-                                           HoodieTableConfig hoodieTableConfig,
-                                           TypedProperties properties) {
-    super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, 
hoodieTableConfig, properties);
+                                           TypedProperties properties,
+                                           HoodieTableMetaClient metaClient) {
+    super(readerContext, dataSchema, requestedSchema, internalSchemaOpt, 
properties, metaClient);
     if (!readerContext.getRecordContext().supportsParquetRowIndex()) {
       throw new IllegalStateException("Using " + this.getClass().getName() + " 
but context does not support parquet row index");
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
index bb79087e97e0..e84090a54182 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieAvroFileReader.java
@@ -29,7 +29,9 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
 
@@ -48,7 +50,11 @@ public abstract class HoodieAvroFileReader implements 
HoodieFileReader<IndexedRe
     return getIndexedRecordIterator(readerSchema, readerSchema);
   }
 
-  public abstract ClosableIterator<IndexedRecord> 
getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema) throws 
IOException;
+  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
readerSchema, Schema requestedSchema) throws IOException {
+    return getIndexedRecordIterator(readerSchema, requestedSchema, 
Collections.emptyMap());
+  }
+
+  public abstract ClosableIterator<IndexedRecord> 
getIndexedRecordIterator(Schema readerSchema, Schema requestedSchema, 
Map<String, String> renamedColumns) throws IOException;
 
   public abstract ClosableIterator<IndexedRecord> 
getIndexedRecordsByKeysIterator(List<String> keys,
                                                                                
   Schema readerSchema)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index a5e339b83da9..48aae71fd0c9 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -96,9 +96,10 @@ public class HoodieNativeAvroHFileReader extends 
HoodieAvroHFileReaderImplBase {
 
   @Override
   public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
readerSchema,
-                                                                  Schema 
requestedSchema)
+                                                                  Schema 
requestedSchema,
+                                                                  Map<String, 
String> renamedColumns)
       throws IOException {
-    if (!Objects.equals(readerSchema, requestedSchema)) {
+    if (!Objects.equals(readerSchema, requestedSchema) || 
!renamedColumns.isEmpty()) {
       throw new UnsupportedOperationException(
           "Schema projections are not supported in HFile reader");
     }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index 2f0d76043de3..e8b009e3a109 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -561,7 +561,8 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     readerContext.initRecordMerger(metadataConfig.getProps());
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.setHasLogFiles(fileSlice.hasLogFiles());
-    readerContext.setSchemaHandler(new 
FileGroupReaderSchemaHandler<>(readerContext, SCHEMA, SCHEMA, Option.empty(), 
metadataMetaClient.getTableConfig(), metadataConfig.getProps()));
+    readerContext.setSchemaHandler(new 
FileGroupReaderSchemaHandler<>(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        metadataConfig.getProps(), metadataMetaClient));
     readerContext.setShouldMergeUseRecordPosition(false);
     readerContext.setLatestCommitTime(latestMetadataInstantTime);
     return FileGroupRecordBufferLoader.createReusable(readerContext);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 4bfeee6d8704..230eb954507b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -1044,7 +1044,7 @@ public class HoodieTableMetadataUtil {
       readerContext.setHasLogFiles(true);
       HoodieTableConfig tableConfig = datasetMetaClient.getTableConfig();
       readerContext.initRecordMerger(properties);
-      readerContext.setSchemaHandler(new 
FileGroupReaderSchemaHandler<>(readerContext, writerSchemaOpt.get(), 
writerSchemaOpt.get(), Option.empty(), tableConfig, properties));
+      readerContext.setSchemaHandler(new 
FileGroupReaderSchemaHandler<>(readerContext, writerSchemaOpt.get(), 
writerSchemaOpt.get(), Option.empty(), properties, datasetMetaClient));
       HoodieReadStats readStats = new HoodieReadStats();
       KeyBasedFileGroupRecordBuffer<T> recordBuffer = new 
KeyBasedFileGroupRecordBuffer<>(readerContext, datasetMetaClient,
           readerContext.getMergeMode(), PartialUpdateMode.NONE, properties, 
tableConfig.getPreCombineFields(),
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
index f7823cef0cdd..cdb69431927d 100644
--- a/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/avro/TestHoodieAvroUtils.java
@@ -1131,4 +1131,53 @@ public class TestHoodieAvroUtils {
     assertEquals(expectedSchema, projectedSchema);
     assertTrue(AvroSchemaUtils.isSchemaCompatible(projectedSchema, 
expectedSchema, false));
   }
+
+  private static Stream<Arguments> 
recordNeedsRewriteForExtendedAvroTypePromotion() {
+    Schema decimal1 = LogicalTypes.decimal(12, 
2).addToSchema(Schema.create(Schema.Type.BYTES));
+    Schema decimal2 = LogicalTypes.decimal(10, 
2).addToSchema(Schema.create(Schema.Type.BYTES));
+    Schema dateSchema = 
LogicalTypes.date().addToSchema(Schema.create(Schema.Type.INT));
+    Schema doubleSchema = Schema.create(Schema.Type.DOUBLE);
+    Schema intSchema = Schema.create(Schema.Type.INT);
+    Schema longSchema = Schema.create(Schema.Type.LONG);
+    Schema floatSchema = Schema.create(Schema.Type.FLOAT);
+    Schema stringSchema = Schema.create(Schema.Type.STRING);
+
+    Schema recordSchema1 = Schema.createRecord("record1", null, "com.example", 
false,
+        Arrays.asList(new Schema.Field("decimalField", decimal1, null, null),
+            new Schema.Field("doubleField", doubleSchema, null, null)));
+
+    Schema recordSchema2 = Schema.createRecord("record2", null, 
"com.example2", false,
+        Arrays.asList(new Schema.Field("decimalField", decimal1, null, null),
+            new Schema.Field("doubleField", doubleSchema, null, null)));
+
+    return Stream.of(
+        Arguments.of(intSchema, longSchema, false),
+        Arguments.of(intSchema, floatSchema, false),
+        Arguments.of(longSchema, intSchema, true),
+        Arguments.of(longSchema, floatSchema, false),
+        Arguments.of(decimal1, decimal2, true),
+        Arguments.of(doubleSchema, decimal1, true),
+        Arguments.of(decimal1, doubleSchema, true),
+        Arguments.of(intSchema, stringSchema, true),
+        Arguments.of(longSchema, doubleSchema, false),
+        Arguments.of(intSchema, doubleSchema, false),
+        Arguments.of(longSchema, stringSchema, true),
+        Arguments.of(floatSchema, stringSchema, true),
+        Arguments.of(doubleSchema, stringSchema, true),
+        Arguments.of(decimal1, stringSchema, true),
+        Arguments.of(stringSchema, decimal2, true),
+        Arguments.of(stringSchema, intSchema, true),
+        Arguments.of(floatSchema, doubleSchema, true),
+        Arguments.of(doubleSchema, floatSchema, true),
+        Arguments.of(recordSchema1, recordSchema2, false),
+        Arguments.of(dateSchema, stringSchema, true)
+    );
+  }
+
+  @ParameterizedTest
+  @MethodSource
+  void recordNeedsRewriteForExtendedAvroTypePromotion(Schema writerSchema, 
Schema readerSchema, boolean expected) {
+    boolean result = 
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(writerSchema, 
readerSchema);
+    assertEquals(expected, result);
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
index 85813f59f1bc..bed30d3a2538 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/SchemaHandlerTestBase.java
@@ -26,6 +26,7 @@ import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.DefaultJavaTypeConverter;
@@ -38,6 +39,7 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.params.provider.Arguments;
 
 import java.io.IOException;
@@ -67,6 +69,13 @@ public abstract class SchemaHandlerTestBase {
       .map(Schema.Field::name).filter(f -> 
!f.equals("_hoodie_is_deleted")).toArray(String[]::new));
   protected static final Schema DATA_COLS_ONLY_SCHEMA = 
generateProjectionSchema("begin_lat", "tip_history", "rider");
   protected static final Schema META_COLS_ONLY_SCHEMA = 
generateProjectionSchema("_hoodie_commit_seqno", "_hoodie_record_key");
+  protected final HoodieTableMetaClient metaClient = 
mock(HoodieTableMetaClient.class);
+  protected final HoodieTableConfig hoodieTableConfig = 
mock(HoodieTableConfig.class);
+
+  @BeforeEach
+  void setup() {
+    when(metaClient.getTableConfig()).thenReturn(hoodieTableConfig);
+  }
 
   static Stream<Arguments> testMorParams(boolean supportsParquetRowIndex) {
     Stream.Builder<Arguments> b = Stream.builder();
@@ -89,14 +98,13 @@ public abstract class SchemaHandlerTestBase {
                       boolean supportsParquetRowIndex,
                       boolean hasBuiltInDelete) throws IOException {
     Schema dataSchema = hasBuiltInDelete ? DATA_SCHEMA : DATA_SCHEMA_NO_DELETE;
-    HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
     setupMORTable(mergeMode, hasPrecombine, hoodieTableConfig);
     HoodieRecordMerger merger = mockRecordMerger(isProjectionCompatible,
         isProjectionCompatible ? new String[] {"begin_lat", "begin_lon", 
"_hoodie_record_key", "timestamp"} : new String[] {"begin_lat", "begin_lon", 
"timestamp"});
     HoodieReaderContext<String> readerContext = 
createReaderContext(hoodieTableConfig, supportsParquetRowIndex, true, false, 
mergeUseRecordPosition, merger);
     readerContext.setRecordMerger(Option.of(merger));
     Schema requestedSchema = dataSchema;
-    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, dataSchema, requestedSchema, 
hoodieTableConfig, supportsParquetRowIndex);
+    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, dataSchema, requestedSchema, 
supportsParquetRowIndex);
     Schema expectedRequiredFullSchema = supportsParquetRowIndex && 
mergeUseRecordPosition
         ? 
ParquetRowIndexBasedSchemaHandler.addPositionalMergeCol(requestedSchema)
         : requestedSchema;
@@ -105,7 +113,7 @@ public abstract class SchemaHandlerTestBase {
 
     //read subset of columns
     requestedSchema = DATA_COLS_ONLY_SCHEMA;
-    schemaHandler = createSchemaHandler(readerContext, dataSchema, 
requestedSchema, hoodieTableConfig, supportsParquetRowIndex);
+    schemaHandler = createSchemaHandler(readerContext, dataSchema, 
requestedSchema, supportsParquetRowIndex);
     Schema expectedRequiredSchema;
     if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) {
       expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete, 
"begin_lat", "tip_history", "rider", "_hoodie_record_key", "timestamp");
@@ -130,13 +138,12 @@ public abstract class SchemaHandlerTestBase {
                                boolean supportsParquetRowIndex,
                                boolean hasBuiltInDelete) throws IOException {
     Schema dataSchema = hasBuiltInDelete ? DATA_SCHEMA : DATA_SCHEMA_NO_DELETE;
-    HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
     setupMORTable(mergeMode, hasPrecombine, hoodieTableConfig);
     HoodieRecordMerger merger = mockRecordMerger(isProjectionCompatible, new 
String[] {"begin_lat", "begin_lon", "timestamp"});
     HoodieReaderContext<String> readerContext = 
createReaderContext(hoodieTableConfig, supportsParquetRowIndex, true, true, 
mergeUseRecordPosition, merger);
     readerContext.setRecordMerger(Option.of(merger));
     Schema requestedSchema = dataSchema;
-    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, dataSchema, requestedSchema, 
hoodieTableConfig, supportsParquetRowIndex);
+    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, dataSchema, requestedSchema, 
supportsParquetRowIndex);
     Schema expectedRequiredFullSchema = supportsParquetRowIndex && 
mergeUseRecordPosition
         ? 
ParquetRowIndexBasedSchemaHandler.addPositionalMergeCol(requestedSchema)
         : requestedSchema;
@@ -153,7 +160,7 @@ public abstract class SchemaHandlerTestBase {
 
     //read subset of columns
     requestedSchema = generateProjectionSchema("begin_lat", "tip_history", 
"_hoodie_record_key", "rider");
-    schemaHandler = createSchemaHandler(readerContext, dataSchema, 
requestedSchema, hoodieTableConfig, supportsParquetRowIndex);
+    schemaHandler = createSchemaHandler(readerContext, dataSchema, 
requestedSchema, supportsParquetRowIndex);
     Schema expectedRequiredSchema;
     if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) {
       expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete, 
"_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp");
@@ -180,7 +187,7 @@ public abstract class SchemaHandlerTestBase {
 
     // request only data cols
     requestedSchema = DATA_COLS_ONLY_SCHEMA;
-    schemaHandler = createSchemaHandler(readerContext, dataSchema, 
requestedSchema, hoodieTableConfig, supportsParquetRowIndex);
+    schemaHandler = createSchemaHandler(readerContext, dataSchema, 
requestedSchema, supportsParquetRowIndex);
     if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) {
       expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete, 
"_hoodie_record_key", "begin_lat", "tip_history", "rider", "timestamp");
       assertTrue(readerContext.getNeedsBootstrapMerge());
@@ -215,7 +222,7 @@ public abstract class SchemaHandlerTestBase {
 
     // request only meta cols
     requestedSchema = META_COLS_ONLY_SCHEMA;
-    schemaHandler = createSchemaHandler(readerContext, dataSchema, 
requestedSchema, hoodieTableConfig, supportsParquetRowIndex);
+    schemaHandler = createSchemaHandler(readerContext, dataSchema, 
requestedSchema, supportsParquetRowIndex);
     if (mergeMode == EVENT_TIME_ORDERING && hasPrecombine) {
       expectedRequiredSchema = generateProjectionSchema(hasBuiltInDelete, 
"_hoodie_commit_seqno", "_hoodie_record_key", "timestamp");
       assertTrue(readerContext.getNeedsBootstrapMerge());
@@ -283,8 +290,7 @@ public abstract class SchemaHandlerTestBase {
   }
 
   abstract FileGroupReaderSchemaHandler 
createSchemaHandler(HoodieReaderContext<String> readerContext, Schema 
dataSchema,
-                                                          Schema 
requestedSchema, HoodieTableConfig hoodieTableConfig,
-                                                          boolean 
supportsParquetRowIndex);
+                                                            Schema 
requestedSchema, boolean supportsParquetRowIndex);
 
   static Schema generateProjectionSchema(String... fields) {
     return HoodieAvroUtils.generateProjectionSchema(DATA_SCHEMA, 
Arrays.asList(fields));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
index 39ff90cc05c5..2da196905f92 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestFileGroupReaderSchemaHandler.java
@@ -22,17 +22,23 @@ package org.apache.hudi.common.table.read;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
+import org.apache.hudi.common.util.InternalSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.Types;
+import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
+import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.junit.jupiter.api.Test;
@@ -40,12 +46,17 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
 
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.model.DefaultHoodieRecordPayload.DELETE_KEY;
@@ -60,29 +71,27 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
 
   @Test
   public void testCow() {
-    HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
     when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
     HoodieReaderContext<String> readerContext = 
createReaderContext(hoodieTableConfig, false, false, false, false, null);
     Schema requestedSchema = DATA_SCHEMA;
-    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, 
hoodieTableConfig, false);
+    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, false);
     assertEquals(requestedSchema, schemaHandler.getRequiredSchema());
 
     //read subset of columns
     requestedSchema = generateProjectionSchema("begin_lat", "tip_history", 
"rider");
-    schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, 
requestedSchema, hoodieTableConfig, false);
+    schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, 
requestedSchema, false);
     assertEquals(requestedSchema, schemaHandler.getRequiredSchema());
     assertFalse(readerContext.getNeedsBootstrapMerge());
   }
 
   @Test
   public void testCowBootstrap() {
-    HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
     when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
     HoodieReaderContext<String> readerContext = 
createReaderContext(hoodieTableConfig, false, false, true, false, null);
     Schema requestedSchema = generateProjectionSchema("begin_lat", 
"tip_history", "_hoodie_record_key", "rider");
 
     //meta cols must go first in the required schema
-    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, 
hoodieTableConfig, false);
+    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, false);
     assertTrue(readerContext.getNeedsBootstrapMerge());
     Schema expectedRequiredSchema = 
generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", 
"rider");
     assertEquals(expectedRequiredSchema, schemaHandler.getRequiredSchema());
@@ -91,6 +100,34 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
     assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), 
getField("rider")), bootstrapFields.getRight());
   }
 
+  @Test
+  void testGetRequiredSchemaForFileAndRenameColumns() {
+    when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
+    HoodieReaderContext<String> readerContext = 
createReaderContext(hoodieTableConfig, false, false, true, false, null);
+    Schema requestedSchema = generateProjectionSchema("_hoodie_record_key", 
"timestamp", "rider");
+
+    InternalSchema internalSchema = 
AvroInternalSchemaConverter.convert(DATA_SCHEMA);
+    InternalSchema originalSchema = new 
InternalSchema(Types.RecordType.get(internalSchema.columns().stream().map(field 
-> {
+      if (field.name().equals("timestamp")) {
+        // rename timestamp to ts in file schema and change type to int, 
output schema names and types must match the requested schema
+        return Types.Field.get(field.fieldId(), "ts", Types.IntType.get());
+      }
+      return field;
+    }).collect(Collectors.toList())));
+    FileGroupReaderSchemaHandler<String> schemaHandler = new 
FileGroupReaderSchemaHandler<>(readerContext, DATA_SCHEMA, requestedSchema,
+        Option.of(internalSchema), new TypedProperties(), metaClient);
+
+    try (MockedStatic<InternalSchemaCache> mockedStatic = 
Mockito.mockStatic(InternalSchemaCache.class)) {
+      String instantTime = "20231010101010";
+      mockedStatic.when(() -> 
InternalSchemaCache.searchSchemaAndCache(Long.parseLong(instantTime), 
metaClient))
+          .thenReturn(originalSchema);
+      StoragePath filePath = new StoragePath("/2023-01-01/" + 
FSUtils.makeBaseFileName(instantTime, "1-0-1", UUID.randomUUID().toString(), 
HoodieFileFormat.PARQUET.getFileExtension()));
+      Pair<Schema, Map<String, String>> requiredSchemaAndRenamedFields = 
schemaHandler.getRequiredSchemaForFileAndRenamedColumns(filePath);
+      assertEquals(Collections.singletonMap("timestamp", "ts"), 
requiredSchemaAndRenamedFields.getRight());
+      assertEquals(requestedSchema, requiredSchemaAndRenamedFields.getLeft());
+    }
+  }
+
   private static Stream<Arguments> testMorParams() {
     return testMorParams(false);
   }
@@ -118,10 +155,10 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
   }
 
   @Override
-  FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext<String> 
readerContext, Schema dataSchema, Schema requestedSchema, HoodieTableConfig 
hoodieTableConfig,
+  FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext<String> 
readerContext, Schema dataSchema, Schema requestedSchema,
                                                    boolean 
supportsParquetRowIndex) {
     return new FileGroupReaderSchemaHandler(readerContext, dataSchema, 
requestedSchema,
-        Option.empty(), hoodieTableConfig, new TypedProperties());
+        Option.empty(), new TypedProperties(), metaClient);
   }
 
   @ParameterizedTest
@@ -189,23 +226,22 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
     Schema dataSchema = SchemaTestUtil.getSchemaFromFields(dataSchemaFields);
     Schema requestedSchema = 
SchemaTestUtil.getSchemaFromFields(Arrays.asList(HoodieRecord.RECORD_KEY_METADATA_FIELD,
 HoodieRecord.PARTITION_PATH_METADATA_FIELD));
 
-    HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
-    when(tableConfig.getRecordMergeMode()).thenReturn(mergeMode);
-    when(tableConfig.populateMetaFields()).thenReturn(true);
-    
when(tableConfig.getPreCombineFieldsStr()).thenReturn(Option.of(setPrecombine ? 
preCombineField : StringUtils.EMPTY_STRING));
-    when(tableConfig.getPreCombineFields()).thenReturn(setPrecombine ? 
Collections.singletonList(preCombineField) : Collections.emptyList());
-    when(tableConfig.getTableVersion()).thenReturn(tableVersion);
-    if (tableConfig.getTableVersion() == HoodieTableVersion.SIX) {
+    when(hoodieTableConfig.getRecordMergeMode()).thenReturn(mergeMode);
+    when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
+    
when(hoodieTableConfig.getPreCombineFieldsStr()).thenReturn(Option.of(setPrecombine
 ? preCombineField : StringUtils.EMPTY_STRING));
+    when(hoodieTableConfig.getPreCombineFields()).thenReturn(setPrecombine ? 
Collections.singletonList(preCombineField) : Collections.emptyList());
+    when(hoodieTableConfig.getTableVersion()).thenReturn(tableVersion);
+    if (hoodieTableConfig.getTableVersion() == HoodieTableVersion.SIX) {
       if (mergeMode == RecordMergeMode.EVENT_TIME_ORDERING) {
-        
when(tableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
+        
when(hoodieTableConfig.getPayloadClass()).thenReturn(DefaultHoodieRecordPayload.class.getName());
       } else if (mergeMode == RecordMergeMode.COMMIT_TIME_ORDERING) {
-        
when(tableConfig.getPayloadClass()).thenReturn(OverwriteWithLatestAvroPayload.class.getName());
+        
when(hoodieTableConfig.getPayloadClass()).thenReturn(OverwriteWithLatestAvroPayload.class.getName());
       } else {
-        
when(tableConfig.getPayloadClass()).thenReturn(OverwriteNonDefaultsWithLatestAvroPayload.class.getName());
+        
when(hoodieTableConfig.getPayloadClass()).thenReturn(OverwriteNonDefaultsWithLatestAvroPayload.class.getName());
       }
     }
     if (mergeMode != null) {
-      when(tableConfig.getRecordMergeStrategyId()).thenReturn(mergeStrategyId);
+      
when(hoodieTableConfig.getRecordMergeStrategyId()).thenReturn(mergeStrategyId);
     }
 
     TypedProperties props = new TypedProperties();
@@ -227,7 +263,7 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
       expectedFields.add(HoodieRecord.HOODIE_IS_DELETED_FIELD);
     }
     Schema expectedSchema = ((mergeMode == RecordMergeMode.CUSTOM) && 
!isProjectionCompatible) ? dataSchema : 
SchemaTestUtil.getSchemaFromFields(expectedFields);
-    when(recordMerger.getMandatoryFieldsForMerging(dataSchema, tableConfig, 
props)).thenReturn(expectedFields.toArray(new String[0]));
+    when(recordMerger.getMandatoryFieldsForMerging(dataSchema, 
hoodieTableConfig, props)).thenReturn(expectedFields.toArray(new String[0]));
 
     DeleteContext deleteContext = new DeleteContext(props, dataSchema);
     assertEquals(addHoodieIsDeleted, deleteContext.hasBuiltInDeleteField());
@@ -235,7 +271,7 @@ public class TestFileGroupReaderSchemaHandler extends 
SchemaHandlerTestBase {
             ? Option.of(Pair.of(customDeleteKey, customDeleteValue)) : 
Option.empty(),
         deleteContext.getCustomDeleteMarkerKeyValue());
     FileGroupReaderSchemaHandler fileGroupReaderSchemaHandler = new 
FileGroupReaderSchemaHandler(readerContext,
-        dataSchema, requestedSchema, Option.empty(), tableConfig, props);
+        dataSchema, requestedSchema, Option.empty(), props, metaClient);
     Schema actualSchema = 
fileGroupReaderSchemaHandler.generateRequiredSchema(deleteContext);
     assertEquals(expectedSchema, actualSchema);
   }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
index 22b28c7acf40..865026f1a160 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/TestParquetRowIndexBasedSchemaHandler.java
@@ -22,7 +22,6 @@ package org.apache.hudi.common.table.read;
 import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.HoodieReaderContext;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 
@@ -41,18 +40,16 @@ import static 
org.apache.hudi.common.table.read.ParquetRowIndexBasedSchemaHandle
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 public class TestParquetRowIndexBasedSchemaHandler extends 
SchemaHandlerTestBase {
 
   @Test
   public void testCowBootstrapWithPositionMerge() {
-    HoodieTableConfig hoodieTableConfig = mock(HoodieTableConfig.class);
     when(hoodieTableConfig.populateMetaFields()).thenReturn(true);
     HoodieReaderContext<String> readerContext = 
createReaderContext(hoodieTableConfig, true, false, true, false, null);
     Schema requestedSchema = generateProjectionSchema("begin_lat", 
"tip_history", "_hoodie_record_key", "rider");
-    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, 
hoodieTableConfig, true);
+    FileGroupReaderSchemaHandler schemaHandler = 
createSchemaHandler(readerContext, DATA_SCHEMA, requestedSchema, true);
     assertTrue(readerContext.getNeedsBootstrapMerge());
     //meta cols must go first in the required schema
     Schema expectedRequiredSchema = 
generateProjectionSchema("_hoodie_record_key", "begin_lat", "tip_history", 
"rider");
@@ -61,14 +58,14 @@ public class TestParquetRowIndexBasedSchemaHandler extends 
SchemaHandlerTestBase
     assertEquals(Arrays.asList(getField("_hoodie_record_key"), 
getPositionalMergeField()), bootstrapFields.getLeft());
     assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), 
getField("rider"), getPositionalMergeField()), bootstrapFields.getRight());
 
-    schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, 
DATA_COLS_ONLY_SCHEMA, hoodieTableConfig, true);
+    schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, 
DATA_COLS_ONLY_SCHEMA, true);
     assertFalse(readerContext.getNeedsBootstrapMerge());
     assertEquals(DATA_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema());
     bootstrapFields = schemaHandler.getBootstrapRequiredFields();
     assertTrue(bootstrapFields.getLeft().isEmpty());
     assertEquals(Arrays.asList(getField("begin_lat"), getField("tip_history"), 
getField("rider")), bootstrapFields.getRight());
 
-    schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, 
META_COLS_ONLY_SCHEMA, hoodieTableConfig, true);
+    schemaHandler = createSchemaHandler(readerContext, DATA_SCHEMA, 
META_COLS_ONLY_SCHEMA, true);
     assertFalse(readerContext.getNeedsBootstrapMerge());
     assertEquals(META_COLS_ONLY_SCHEMA, schemaHandler.getRequiredSchema());
     bootstrapFields = schemaHandler.getBootstrapRequiredFields();
@@ -103,9 +100,9 @@ public class TestParquetRowIndexBasedSchemaHandler extends 
SchemaHandlerTestBase
   }
 
   @Override
-  FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext<String> 
readerContext, Schema dataSchema, Schema requestedSchema, HoodieTableConfig 
hoodieTableConfig,
+  FileGroupReaderSchemaHandler createSchemaHandler(HoodieReaderContext<String> 
readerContext, Schema dataSchema, Schema requestedSchema,
                                                    boolean 
supportsParquetRowIndex) {
     return new ParquetRowIndexBasedSchemaHandler(readerContext, dataSchema, 
requestedSchema,
-        Option.empty(), hoodieTableConfig, new TypedProperties());
+        Option.empty(), new TypedProperties(), metaClient);
   }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
index 50eaeb454df2..b4011a7205a5 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestKeyBasedFileGroupRecordBuffer.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.log.block.HoodieDataBlock;
@@ -153,8 +154,8 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.initRecordMerger(properties);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     List<HoodieRecord> inputRecords = 
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7));
     
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), false));
@@ -220,8 +221,8 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.initRecordMerger(properties);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     List<HoodieRecord> inputRecords = 
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update,
         testRecord4EarlierUpdate, testRecord7));
@@ -299,8 +300,8 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.initRecordMerger(properties);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     List<HoodieRecord> inputRecords = 
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate));
     
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
@@ -374,8 +375,8 @@ class TestKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuffer {
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.initRecordMerger(properties);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     List<HoodieRecord> inputRecords = 
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate));
     
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
index 458979f1dfe0..13e8feba92c0 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestSortedKeyBasedFileGroupRecordBuffer.java
@@ -123,8 +123,8 @@ class TestSortedKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecordBuf
     HoodieReaderContext<IndexedRecord> readerContext = new 
HoodieAvroReaderContext(storageConfiguration, tableConfig, Option.empty(), 
Option.empty());
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     readerContext.initRecordMerger(properties);
     List<HoodieRecord> inputRecords =
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
index baf1140cfee2..8c7e5d65a530 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/table/read/buffer/TestStreamingKeyBasedFileGroupRecordBuffer.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.PartialUpdateMode;
 import org.apache.hudi.common.table.read.FileGroupReaderSchemaHandler;
@@ -83,8 +84,8 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecord
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.initRecordMerger(properties);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     List<HoodieRecord> inputRecords = 
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7));
     
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), false));
@@ -120,8 +121,8 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecord
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.initRecordMerger(properties);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     List<HoodieRecord> inputRecords = 
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate, testRecord7));
     
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
@@ -159,8 +160,8 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecord
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.initRecordMerger(properties);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     List<HoodieRecord> inputRecords = 
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate));
     
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
@@ -197,8 +198,8 @@ class TestStreamingKeyBasedFileGroupRecordBuffer extends 
BaseTestFileGroupRecord
     readerContext.setHasLogFiles(false);
     readerContext.setHasBootstrapBaseFile(false);
     readerContext.initRecordMerger(properties);
-    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(), 
tableConfig,
-        properties);
+    FileGroupReaderSchemaHandler schemaHandler = new 
FileGroupReaderSchemaHandler(readerContext, SCHEMA, SCHEMA, Option.empty(),
+        properties, mock(HoodieTableMetaClient.class));
     readerContext.setSchemaHandler(schemaHandler);
     List<HoodieRecord> inputRecords = 
convertToHoodieRecordsList(Arrays.asList(testRecord1UpdateWithSameTime, 
testRecord2Update, testRecord3Update, testRecord4EarlierUpdate));
     
inputRecords.addAll(convertToHoodieRecordsListForDeletes(Arrays.asList(testRecord5DeleteByCustomMarker,
 testRecord6DeleteByCustomMarker), true));
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
index c7ddd16cb071..e805e22e0bff 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroOrcReader.java
@@ -47,6 +47,7 @@ import org.apache.orc.TypeDescription;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -82,7 +83,7 @@ public class HoodieAvroOrcReader extends HoodieAvroFileReader 
{
   }
 
   @Override
-  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
readerSchema, Schema requestedSchema) {
+  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
readerSchema, Schema requestedSchema, Map<String, String> renamedColumns) {
     if (!Objects.equals(readerSchema, requestedSchema)) {
       throw new UnsupportedOperationException("Schema projections are not 
supported in HFile reader");
     }
@@ -98,10 +99,10 @@ public class HoodieAvroOrcReader extends 
HoodieAvroFileReader {
       TypeDescription orcSchema = 
AvroOrcUtils.createOrcSchema(prunedFileSchema);
       RecordReader recordReader = reader.rows(new 
Options(hadoopConf).schema(orcSchema));
       ClosableIterator<IndexedRecord> recordIterator = new 
OrcReaderIterator<>(recordReader, prunedFileSchema, orcSchema);
-      if (readerSchema.equals(fileSchema)) {
+      if (renamedColumns.isEmpty() && readerSchema.equals(fileSchema)) {
         return recordIterator;
       } else {
-        return new CloseableMappingIterator<>(recordIterator, data -> 
HoodieAvroUtils.rewriteRecordWithNewSchema(data, requestedSchema));
+        return new CloseableMappingIterator<>(recordIterator, data -> 
HoodieAvroUtils.rewriteRecordWithNewSchema(data, requestedSchema, 
renamedColumns));
       }
     } catch (IOException io) {
       throw new HoodieIOException("Unable to create an ORC reader.", io);
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
index 88ff9273196c..588100337397 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReader.java
@@ -49,7 +49,9 @@ import org.apache.parquet.hadoop.ParquetReader;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 import static org.apache.hudi.common.util.TypeUtils.unsafeCast;
@@ -100,12 +102,17 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
 
   @Override
   protected ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
schema) throws IOException {
-    return getIndexedRecordIteratorInternal(schema);
+    return getIndexedRecordIteratorInternal(schema, Collections.emptyMap());
   }
 
   @Override
   public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
readerSchema, Schema requestedSchema) throws IOException {
-    return getIndexedRecordIteratorInternal(requestedSchema);
+    return getIndexedRecordIteratorInternal(requestedSchema, 
Collections.emptyMap());
+  }
+
+  @Override
+  public ClosableIterator<IndexedRecord> getIndexedRecordIterator(Schema 
readerSchema, Schema requestedSchema, Map<String, String> renamedColumns) 
throws IOException {
+    return getIndexedRecordIteratorInternal(requestedSchema, renamedColumns);
   }
 
   @Override
@@ -166,13 +173,13 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
     return conf;
   }
 
-  private ClosableIterator<IndexedRecord> 
getIndexedRecordIteratorInternal(Schema schema) throws IOException {
+  private ClosableIterator<IndexedRecord> 
getIndexedRecordIteratorInternal(Schema schema, Map<String, String> 
renamedColumns) throws IOException {
     // NOTE: We have to set both Avro read-schema and projection schema to make
     //       sure that in case the file-schema is not equal to read-schema 
we'd still
     //       be able to read that file (in case projection is a proper one)
     Configuration hadoopConf = 
storage.getConf().unwrapCopyAs(Configuration.class);
     Option<Schema> promotedSchema = Option.empty();
-    if 
(HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(), 
schema)) {
+    if (!renamedColumns.isEmpty() || 
HoodieAvroUtils.recordNeedsRewriteForExtendedAvroTypePromotion(getSchema(), 
schema)) {
       AvroReadSupport.setAvroReadSchema(hadoopConf, getSchema());
       AvroReadSupport.setRequestedProjection(hadoopConf, getSchema());
       promotedSchema = Option.of(schema);
@@ -186,7 +193,7 @@ public class HoodieAvroParquetReader extends 
HoodieAvroFileReader {
             .set(ParquetInputFormat.STRICT_TYPE_CHECKING, 
hadoopConf.get(ParquetInputFormat.STRICT_TYPE_CHECKING))
             .build();
     ParquetReaderIterator<IndexedRecord> parquetReaderIterator = 
promotedSchema.isPresent()
-        ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get())
+        ? new HoodieAvroParquetReaderIterator(reader, promotedSchema.get(), 
renamedColumns)
         : new ParquetReaderIterator<>(reader);
     readerIterators.add(parquetReaderIterator);
     return parquetReaderIterator;
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
index 2723f4d1900f..c0eaf16d08b5 100644
--- 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/io/hadoop/HoodieAvroParquetReaderIterator.java
@@ -26,15 +26,20 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.parquet.hadoop.ParquetReader;
 
+import java.util.Map;
+
 public class HoodieAvroParquetReaderIterator extends 
ParquetReaderIterator<IndexedRecord> {
   private final Schema promotedSchema;
-  public HoodieAvroParquetReaderIterator(ParquetReader<IndexedRecord> 
parquetReader, Schema promotedSchema) {
+  private final Map<String, String> renamedColumns;
+
+  public HoodieAvroParquetReaderIterator(ParquetReader<IndexedRecord> 
parquetReader, Schema promotedSchema, Map<String, String> renamedColumns) {
     super(parquetReader);
     this.promotedSchema = promotedSchema;
+    this.renamedColumns = renamedColumns;
   }
 
   @Override
   public IndexedRecord next() {
-    return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(), 
this.promotedSchema);
+    return HoodieAvroUtils.rewriteRecordWithNewSchema(super.next(), 
promotedSchema, renamedColumns);
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 18cfbf05fe9d..a9ce1a816302 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -508,8 +508,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
     readerContext.setHasBootstrapBaseFile(false)
     readerContext.setHasLogFiles(true)
     readerContext.setSchemaHandler(
-      new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema, 
avroSchema,
-        Option.empty(), metaClient.getTableConfig, readerProperties))
+      new FileGroupReaderSchemaHandler[InternalRow](readerContext, avroSchema, 
avroSchema, Option.empty(), readerProperties, metaClient))
     val stats = new HoodieReadStats
     keyBasedFileGroupRecordBuffer.ifPresent(k => k.close())
     keyBasedFileGroupRecordBuffer = Option.of(new 
KeyBasedFileGroupRecordBuffer[InternalRow](readerContext, metaClient, 
readerContext.getMergeMode,
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
index 7b2d825b55db..44c13eed32b9 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/TestPositionBasedFileGroupRecordBuffer.java
@@ -139,8 +139,8 @@ public class TestPositionBasedFileGroupRecordBuffer extends 
SparkClientFunctiona
       ctx.setRecordMerger(Option.empty());
     }
     ctx.setSchemaHandler(HoodieSparkUtils.gteqSpark3_5()
-        ? new ParquetRowIndexBasedSchemaHandler<>(ctx, avroSchema, avroSchema, 
Option.empty(), metaClient.getTableConfig(), new TypedProperties())
-        : new FileGroupReaderSchemaHandler<>(ctx, avroSchema, avroSchema, 
Option.empty(), metaClient.getTableConfig(), new TypedProperties()));
+        ? new ParquetRowIndexBasedSchemaHandler<>(ctx, avroSchema, avroSchema, 
Option.empty(), new TypedProperties(), metaClient)
+        : new FileGroupReaderSchemaHandler<>(ctx, avroSchema, avroSchema, 
Option.empty(), new TypedProperties(), metaClient));
     TypedProperties props = new TypedProperties();
     props.put("hoodie.write.record.merge.mode", mergeMode.name());
     
props.setProperty(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),String.valueOf(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.defaultValue()));

Reply via email to