This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 944f8e4b230 [HUDI-8552] Adding support to read input for clustering 
based on new FG reader  (#12426)
944f8e4b230 is described below

commit 944f8e4b230685e507ae3fa276d762c65c5f48d2
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Thu Dec 5 04:33:22 2024 -0800

    [HUDI-8552] Adding support to read input for clustering based on new FG 
reader  (#12426)
    
    * Adding new FG reader support for reading inputs for clustering
    
    Cosmetic changes for MultipleSparkJobExecutionStrategy
    
    Fixing spark parquet reader SQLConf parsing issue
    
    fixing spillable map to contain schema Id instead of schema
    
    Add schema encode/decode for record metadata schema
    
    renaming INTERNAL_META_SCHEMA to INTERNAL_META_SCHEMA_ID
    
    Fixing compilation issues after rebase
    
    Fixing schema handling with delete records
    
    Fixing failing test
    
    Fixing sql config parsing issue
    
    * remove type check
    
    ---------
    
    Co-authored-by: Sagar Sumit <[email protected]>
---
 .../MultipleSparkJobExecutionStrategy.java         | 137 +++++++++++++++++++--
 .../apache/hudi/table/SparkBroadcastManager.java   |  16 ++-
 .../hudi/BaseSparkInternalRowReaderContext.java    |   2 +-
 .../hudi/common/engine/HoodieReaderContext.java    |   6 +-
 .../read/HoodieBaseFileGroupRecordBuffer.java      |  33 ++---
 .../read/HoodieFileGroupReaderSchemaHandler.java   |  29 ++++-
 .../apache/hudi/common/util/AvroSchemaCache.java   |  72 +++++++++++
 .../hudi/hadoop/HiveHoodieReaderContext.java       |   2 +-
 .../parquet/SparkParquetReaderBase.scala           |  16 ++-
 .../apache/spark/sql/hudi/ddl/TestSpark3DDL.scala  |  11 +-
 .../hudi/dml/TestPartialUpdateForMergeInto.scala   |  72 ++++++++---
 11 files changed, 346 insertions(+), 50 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 57fc02960c2..0c28a9736fa 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
+import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
@@ -27,20 +28,30 @@ import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.ConcatenatingIterator;
 import org.apache.hudi.common.config.HoodieMemoryConfig;
+import org.apache.hudi.common.config.HoodieReaderConfig;
+import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.engine.HoodieReaderContext;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.ClusteringOperation;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.log.HoodieFileSliceReader;
 import org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
+import org.apache.hudi.common.table.read.HoodieFileGroupReader;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.common.util.CustomizedThreadFactory;
 import org.apache.hudi.common.util.FutureUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.common.util.collection.CloseableMappingIterator;
 import org.apache.hudi.common.util.collection.Pair;
@@ -48,12 +59,15 @@ import org.apache.hudi.config.HoodieClusteringConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.exception.HoodieException;
 import 
org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerFactory;
 import 
org.apache.hudi.execution.bulkinsert.BulkInsertInternalPartitionerWithRowsFactory;
 import org.apache.hudi.execution.bulkinsert.RDDCustomColumnsSortPartitioner;
 import org.apache.hudi.execution.bulkinsert.RDDSpatialCurveSortPartitioner;
 import org.apache.hudi.execution.bulkinsert.RowCustomColumnsSortPartitioner;
 import org.apache.hudi.execution.bulkinsert.RowSpatialCurveSortPartitioner;
+import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.internal.schema.utils.SerDeHelper;
 import org.apache.hudi.io.IOUtils;
 import org.apache.hudi.io.storage.HoodieFileReader;
 import org.apache.hudi.keygen.BaseKeyGenerator;
@@ -61,9 +75,11 @@ import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
 import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.table.BulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.SparkBroadcastManager;
 import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 
@@ -71,11 +87,16 @@ import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.rdd.RDD;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.HoodieDataTypeUtils;
+import org.apache.spark.sql.HoodieUnsafeUtils;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SQLContext;
+import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.BaseRelation;
+import org.apache.spark.sql.types.StructType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -94,6 +115,7 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.client.utils.SparkPartitionUtils.getPartitionFieldVals;
+import static 
org.apache.hudi.common.config.HoodieReaderConfig.MERGE_USE_RECORD_POSITIONS;
 import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
 import static 
org.apache.hudi.io.storage.HoodieSparkIOFactory.getHoodieSparkIOFactory;
 
@@ -263,14 +285,21 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
                                                                                
      String instantTime,
                                                                                
      ExecutorService clusteringExecutorService) {
     return CompletableFuture.supplyAsync(() -> {
-      JavaSparkContext jsc = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
-      Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, 
clusteringGroup, instantTime);
-      Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
+      JavaSparkContext jsc = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());      // incase of 
MIT, config.getSchema may not contain the full table schema
+      Schema tableSchemaWithMetaFields = null;
+      try {
+        tableSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(new 
TableSchemaResolver(getHoodieTable().getMetaClient()).getTableAvroSchema(false),
+            getWriteConfig().allowOperationMetadataField());
+      } catch (Exception e) {
+        throw new HoodieException("Failed to get table schema during 
clustering", e);
+      }
+      Dataset<Row> inputRecords = readRecordsForGroupAsRow(jsc, 
clusteringGroup, instantTime, tableSchemaWithMetaFields);
+
       List<HoodieFileGroupId> inputFileIds = 
clusteringGroup.getSlices().stream()
           .map(info -> new HoodieFileGroupId(info.getPartitionPath(), 
info.getFileId()))
           .collect(Collectors.toList());
-      return performClusteringWithRecordsAsRow(inputRecords, 
clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams, 
readerSchema, inputFileIds, shouldPreserveHoodieMetadata,
-          clusteringGroup.getExtraMetadata());
+      return performClusteringWithRecordsAsRow(inputRecords, 
clusteringGroup.getNumOutputFileGroups(), instantTime, strategyParams,
+          tableSchemaWithMetaFields, inputFileIds, 
shouldPreserveHoodieMetadata, clusteringGroup.getExtraMetadata());
     }, clusteringExecutorService);
   }
 
@@ -420,10 +449,27 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
    */
   private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc,
                                                 HoodieClusteringGroup 
clusteringGroup,
-                                                String instantTime) {
+                                                String instantTime,
+                                                Schema 
tableSchemaWithMetaFields) {
     List<ClusteringOperation> clusteringOps = 
clusteringGroup.getSlices().stream()
         .map(ClusteringOperation::create).collect(Collectors.toList());
-    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
op.getDeltaFilePaths().size() > 0);
+    boolean canUseFileGroupReaderBasedClustering = 
getWriteConfig().getBooleanOrDefault(HoodieReaderConfig.FILE_GROUP_READER_ENABLED)
+        && 
getWriteConfig().getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS)
+        && clusteringOps.stream().allMatch(slice -> 
StringUtils.isNullOrEmpty(slice.getBootstrapFilePath()))
+        && StringUtils.isNullOrEmpty(getWriteConfig().getInternalSchema());
+
+    if (canUseFileGroupReaderBasedClustering) {
+      return readRecordsForGroupAsRowWithFileGroupReader(jsc, instantTime, 
tableSchemaWithMetaFields, clusteringOps);
+    } else {
+      return readRecordsForGroupAsRow(jsc, clusteringOps);
+    }
+  }
+
+  /**
+   * Get dataset of all records for the group. This includes all records from 
file slice (Apply updates from log files, if any).
+   */
+  private Dataset<Row> readRecordsForGroupAsRow(JavaSparkContext jsc, 
List<ClusteringOperation> clusteringOps) {
+    boolean hasLogFiles = clusteringOps.stream().anyMatch(op -> 
!op.getDeltaFilePaths().isEmpty());
     SQLContext sqlContext = new SQLContext(jsc.sc());
 
     StoragePath[] baseFilePaths = clusteringOps
@@ -480,6 +526,83 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     return sqlContext.baseRelationToDataFrame(relation);
   }
 
+  private Dataset<Row> 
readRecordsForGroupAsRowWithFileGroupReader(JavaSparkContext jsc,
+                                                                   String 
instantTime,
+                                                                   Schema 
tableSchemaWithMetaFields,
+                                                                   
List<ClusteringOperation> clusteringOps) {
+    String basePath = getWriteConfig().getBasePath();
+    // construct supporting cast that executors might need
+    final boolean usePosition = 
getWriteConfig().getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
+    String internalSchemaStr = getWriteConfig().getInternalSchema();
+    boolean isInternalSchemaPresent = 
!StringUtils.isNullOrEmpty(internalSchemaStr);
+    SerializableSchema serializableTableSchemaWithMetaFields = new 
SerializableSchema(tableSchemaWithMetaFields);
+
+    // broadcast reader context.
+    SparkBroadcastManager broadcastManager = new 
SparkBroadcastManager(getEngineContext());
+    broadcastManager.prepareAndBroadcast();
+    StructType sparkSchemaWithMetaFields = 
AvroConversionUtils.convertAvroSchemaToStructType(tableSchemaWithMetaFields);
+
+    RDD<InternalRow> internalRowRDD = jsc.parallelize(clusteringOps, 
clusteringOps.size()).flatMap(new FlatMapFunction<ClusteringOperation, 
InternalRow>() {
+      @Override
+      public Iterator<InternalRow> call(ClusteringOperation 
clusteringOperation) throws Exception {
+        FileSlice fileSlice = clusteringOperation2FileSlice(basePath, 
clusteringOperation);
+        // instantiate other supporting cast
+        Schema readerSchema = serializableTableSchemaWithMetaFields.get();
+        Option<InternalSchema> internalSchemaOption = Option.empty();
+        if (isInternalSchemaPresent) {
+          internalSchemaOption = SerDeHelper.fromJson(internalSchemaStr);
+        }
+        Option<HoodieReaderContext> readerContextOpt = 
broadcastManager.retrieveFileGroupReaderContext(new StoragePath(basePath));
+        Configuration conf = broadcastManager.retrieveStorageConfig().get();
+
+        // instantiate FG reader
+        HoodieFileGroupReader<T> fileGroupReader = new HoodieFileGroupReader<>(
+            readerContextOpt.get(),
+            getHoodieTable().getMetaClient().getStorage().newInstance(new 
StoragePath(basePath), new HadoopStorageConfiguration(conf)),
+            basePath,
+            instantTime,
+            fileSlice,
+            readerSchema,
+            readerSchema,
+            internalSchemaOption,
+            getHoodieTable().getMetaClient(),
+            getHoodieTable().getMetaClient().getTableConfig().getProps(),
+            0,
+            Long.MAX_VALUE,
+            usePosition);
+        fileGroupReader.initRecordIterators();
+        // read records from the FG reader
+        HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow> 
recordIterator
+            = 
(HoodieFileGroupReader.HoodieFileGroupReaderIterator<InternalRow>) 
fileGroupReader.getClosableIterator();
+        return recordIterator;
+      }
+    }).rdd();
+
+    return 
HoodieUnsafeUtils.createDataFrameFromRDD(((HoodieSparkEngineContext) 
getEngineContext()).getSqlContext().sparkSession(),
+        internalRowRDD, sparkSchemaWithMetaFields);
+  }
+
+  /**
+   * Construct FileSlice from a given clustering operation {@code 
clusteringOperation}.
+   */
+  private FileSlice clusteringOperation2FileSlice(String basePath, 
ClusteringOperation clusteringOperation) {
+    String partitionPath = clusteringOperation.getPartitionPath();
+    boolean baseFileExists = 
!StringUtils.isNullOrEmpty(clusteringOperation.getDataFilePath());
+    HoodieBaseFile baseFile = baseFileExists ? new HoodieBaseFile(new 
StoragePath(basePath, clusteringOperation.getDataFilePath()).toString()) : null;
+    List<HoodieLogFile> logFiles = 
clusteringOperation.getDeltaFilePaths().stream().map(p ->
+            new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
+                basePath, partitionPath), p)))
+        .sorted(new HoodieLogFile.LogFileComparator())
+        .collect(Collectors.toList());
+
+    ValidationUtils.checkState(baseFileExists || !logFiles.isEmpty(), "Both 
base file and log files are missing from this clustering operation " + 
clusteringOperation);
+    String baseInstantTime = baseFileExists ? baseFile.getCommitTime() : 
logFiles.get(0).getDeltaCommitTime();
+    FileSlice fileSlice = new FileSlice(partitionPath, baseInstantTime, 
clusteringOperation.getFileId());
+    fileSlice.setBaseFile(baseFile);
+    logFiles.forEach(fileSlice::addLogFile);
+    return fileSlice;
+  }
+
   /**
    * Stream to array conversion with generic type is not straightforward.
    * Implement a utility method to abstract high level logic. This needs to be 
improved in future
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
index 19b2b0aa241..337a1c6cff6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkBroadcastManager.java
@@ -19,6 +19,7 @@
 
 package org.apache.hudi.table;
 
+import org.apache.hudi.HoodieSparkUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.SparkFileFormatInternalRowReaderContext;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
@@ -81,7 +82,20 @@ public class SparkBroadcastManager extends 
EngineBroadcastManager {
 
     // Do broadcast.
     sqlConfBroadcast = jsc.broadcast(sqlConf);
-    configurationBroadcast = jsc.broadcast(new 
SerializableConfiguration(jsc.hadoopConfiguration()));
+    // new Configuration() is critical so that we don't run into 
ConcurrentModificatonException
+    Configuration hadoopConf = new Configuration(jsc.hadoopConfiguration());
+    hadoopConf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED().key(), 
false);
+    hadoopConf.setBoolean(SQLConf.CASE_SENSITIVE().key(), false);
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    hadoopConf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING().key(), false);
+    hadoopConf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP().key(), true);
+    // Using string value of this conf to preserve compatibility across spark 
versions.
+    hadoopConf.setBoolean("spark.sql.legacy.parquet.nanosAsLong", false);
+    if (HoodieSparkUtils.gteqSpark3_4()) {
+      // PARQUET_INFER_TIMESTAMP_NTZ_ENABLED is required from Spark 3.4.0 or 
above
+      hadoopConf.setBoolean("spark.sql.parquet.inferTimestampNTZ.enabled", 
false);
+    }
+    configurationBroadcast = jsc.broadcast(new 
SerializableConfiguration(hadoopConf));
     // Spark parquet reader has to be instantiated on the driver and broadcast 
to the executors
     parquetReaderOpt = 
Option.of(SparkAdapterSupport$.MODULE$.sparkAdapter().createParquetFileReader(
         false, sqlConfBroadcast.getValue(), options, 
configurationBroadcast.getValue().value()));
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index 6df203d3718..a195f161e1d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -96,7 +96,7 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
           HoodieRecord.HoodieRecordType.SPARK);
     }
 
-    Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
+    Schema schema = 
getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
     InternalRow row = rowOption.get();
     return new HoodieSparkRecord(row, 
HoodieInternalRowUtils.getCachedSchema(schema));
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
index a0d779cd40e..9a49acaf671 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/engine/HoodieReaderContext.java
@@ -142,7 +142,7 @@ public abstract class HoodieReaderContext<T> {
   public static final String INTERNAL_META_ORDERING_FIELD = "_2";
   public static final String INTERNAL_META_OPERATION = "_3";
   public static final String INTERNAL_META_INSTANT_TIME = "_4";
-  public static final String INTERNAL_META_SCHEMA = "_5";
+  public static final String INTERNAL_META_SCHEMA_ID = "_5";
 
   /**
    * Gets the record iterator based on the type of engine-specific record 
representation from the
@@ -295,7 +295,7 @@ public abstract class HoodieReaderContext<T> {
   public Map<String, Object> generateMetadataForRecord(T record, Schema 
schema) {
     Map<String, Object> meta = new HashMap<>();
     meta.put(INTERNAL_META_RECORD_KEY, getRecordKey(record, schema));
-    meta.put(INTERNAL_META_SCHEMA, schema);
+    meta.put(INTERNAL_META_SCHEMA_ID, 
this.schemaHandler.encodeAvroSchema(schema));
     return meta;
   }
 
@@ -309,7 +309,7 @@ public abstract class HoodieReaderContext<T> {
   public Map<String, Object> 
updateSchemaAndResetOrderingValInMetadata(Map<String, Object> meta,
                                                                        Schema 
schema) {
     meta.remove(INTERNAL_META_ORDERING_FIELD);
-    meta.put(INTERNAL_META_SCHEMA, schema);
+    meta.put(INTERNAL_META_SCHEMA_ID, 
this.schemaHandler.encodeAvroSchema(schema));
     return meta;
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
index e4e0299ec86..96a74860e8a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieBaseFileGroupRecordBuffer.java
@@ -71,9 +71,9 @@ import static 
org.apache.hudi.common.config.HoodieMemoryConfig.MAX_MEMORY_FOR_ME
 import static 
org.apache.hudi.common.config.HoodieMemoryConfig.SPILLABLE_MAP_BASE_PATH;
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_PARTITION_PATH;
 import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_RECORD_KEY;
-import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
 import static 
org.apache.hudi.common.model.HoodieRecord.HOODIE_IS_DELETED_FIELD;
 import static 
org.apache.hudi.common.model.HoodieRecord.OPERATION_METADATA_FIELD;
+import static 
org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA_ID;
 import static 
org.apache.hudi.common.model.HoodieRecordMerger.PAYLOAD_BASED_MERGE_STRATEGY_UUID;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 
@@ -184,6 +184,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   @Override
   public void close() {
     records.clear();
+    readerContext.getSchemaHandler().close();
   }
 
   /**
@@ -208,10 +209,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
         // the `older` in the merge API
         Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().partialMerge(
             readerContext.constructHoodieRecord(Option.of(record), metadata),
-            (Schema) metadata.get(INTERNAL_META_SCHEMA),
+            
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)),
             readerContext.constructHoodieRecord(
                 existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-            (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+            
readerContext.getSchemaHandler().decodeAvroSchema(existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA_ID)),
             readerSchema,
             props);
         if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -241,7 +242,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             Comparable incomingOrderingValue = readerContext.getOrderingValue(
                 Option.of(record), metadata, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
             if (incomingOrderingValue.compareTo(existingOrderingValue) > 0) {
-              return Option.of(Pair.of(isDeleteRecord(Option.of(record), 
(Schema) metadata.get(INTERNAL_META_SCHEMA))
+              return Option.of(Pair.of(isDeleteRecord(Option.of(record), 
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)))
                   ? Option.empty() : Option.of(record), metadata));
             }
             return Option.empty();
@@ -265,10 +266,10 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             } else {
               Option<Pair<HoodieRecord, Schema>> combinedRecordAndSchemaOpt = 
recordMerger.get().merge(
                   readerContext.constructHoodieRecord(Option.of(record), 
metadata),
-                  (Schema) metadata.get(INTERNAL_META_SCHEMA),
+                  
readerContext.getSchemaHandler().decodeAvroSchema(metadata.get(INTERNAL_META_SCHEMA_ID)),
                   readerContext.constructHoodieRecord(
                       existingRecordMetadataPair.getLeft(), 
existingRecordMetadataPair.getRight()),
-                  (Schema) 
existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA),
+                  
readerContext.getSchemaHandler().decodeAvroSchema(existingRecordMetadataPair.getRight().get(INTERNAL_META_SCHEMA_ID)),
                   props);
 
               if (!combinedRecordAndSchemaOpt.isPresent()) {
@@ -391,15 +392,15 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
   protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
                             Option<T> newer, Map<String, Object> newerInfoMap) 
throws IOException {
     if (!older.isPresent()) {
-      return isDeleteRecord(newer, (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
+      return isDeleteRecord(newer, 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
 ? Option.empty() : newer;
     }
 
     if (enablePartialMerging) {
       // TODO(HUDI-7843): decouple the merging logic from the merger
       //  and use the record merge mode to control how to merge partial updates
       Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().partialMerge(
-          readerContext.constructHoodieRecord(older, olderInfoMap), (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA),
-          readerContext.constructHoodieRecord(newer, newerInfoMap), (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA),
+          readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)),
+          readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)),
           readerSchema, props);
 
       if (mergedRecord.isPresent()
@@ -413,7 +414,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     } else {
       switch (recordMergeMode) {
         case COMMIT_TIME_ORDERING:
-          return isDeleteRecord(newer, (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
+          return isDeleteRecord(newer, 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
 ? Option.empty() : newer;
         case EVENT_TIME_ORDERING:
           Comparable newOrderingValue = readerContext.getOrderingValue(
               newer, newerInfoMap, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
@@ -424,9 +425,9 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
               older, olderInfoMap, readerSchema, orderingFieldName, 
orderingFieldTypeOpt, orderingFieldDefault);
           if (!isDeleteRecordWithNaturalOrder(older, oldOrderingValue)
               && oldOrderingValue.compareTo(newOrderingValue) > 0) {
-            return isDeleteRecord(older, (Schema) 
olderInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : older;
+            return isDeleteRecord(older, 
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)))
 ? Option.empty() : older;
           }
-          return isDeleteRecord(newer, (Schema) 
newerInfoMap.get(INTERNAL_META_SCHEMA)) ? Option.empty() : newer;
+          return isDeleteRecord(newer, 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)))
 ? Option.empty() : newer;
         case CUSTOM:
         default:
           if (payloadClass.isPresent()) {
@@ -446,8 +447,8 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
             return Option.empty();
           } else {
             Option<Pair<HoodieRecord, Schema>> mergedRecord = 
recordMerger.get().merge(
-                readerContext.constructHoodieRecord(older, olderInfoMap), 
(Schema) olderInfoMap.get(INTERNAL_META_SCHEMA),
-                readerContext.constructHoodieRecord(newer, newerInfoMap), 
(Schema) newerInfoMap.get(INTERNAL_META_SCHEMA), props);
+                readerContext.constructHoodieRecord(older, olderInfoMap), 
readerContext.getSchemaHandler().decodeAvroSchema(olderInfoMap.get(INTERNAL_META_SCHEMA_ID)),
+                readerContext.constructHoodieRecord(newer, newerInfoMap), 
readerContext.getSchemaHandler().decodeAvroSchema(newerInfoMap.get(INTERNAL_META_SCHEMA_ID)),
 props);
             if (mergedRecord.isPresent()
                 && 
!mergedRecord.get().getLeft().isDelete(mergedRecord.get().getRight(), props)) {
               if (!mergedRecord.get().getRight().equals(readerSchema)) {
@@ -485,7 +486,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     Schema recordSchema = readerSchema;
     GenericRecord record = null;
     if (recordOption.isPresent()) {
-      recordSchema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
+      recordSchema = 
readerContext.getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
       record = readerContext.convertToAvroRecord(recordOption.get(), 
recordSchema);
     }
     HoodieKey hoodieKey = new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH));
@@ -498,7 +499,7 @@ public abstract class HoodieBaseFileGroupRecordBuffer<T> 
implements HoodieFileGr
     if (record.isDelete(readerSchema, props)) {
       return readerSchema;
     }
-    return (Schema) infoMap.get(INTERNAL_META_SCHEMA);
+    return 
readerContext.getSchemaHandler().decodeAvroSchema(infoMap.get(INTERNAL_META_SCHEMA_ID));
   }
 
   protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>, 
Map<String, Object>> logRecordInfo) throws IOException {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
index de2cd920289..6e19a8e7d31 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupReaderSchemaHandler.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.engine.HoodieReaderContext;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.AvroSchemaCache;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
@@ -34,6 +35,9 @@ import 
org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;
 
 import org.apache.avro.Schema;
 
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -51,7 +55,7 @@ import static 
org.apache.hudi.avro.AvroSchemaUtils.findNestedField;
 /**
  * This class is responsible for handling the schema for the file group reader.
  */
-public class HoodieFileGroupReaderSchemaHandler<T> {
+public class HoodieFileGroupReaderSchemaHandler<T> implements Closeable {
 
   protected final Schema dataSchema;
 
@@ -78,6 +82,8 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
 
   protected final boolean needsMORMerge;
 
+  private final AvroSchemaCache avroSchemaCache;
+
   public HoodieFileGroupReaderSchemaHandler(HoodieReaderContext<T> 
readerContext,
                                             Schema dataSchema,
                                             Schema requestedSchema,
@@ -96,6 +102,7 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     this.internalSchema = pruneInternalSchema(requiredSchema, 
internalSchemaOpt);
     this.internalSchemaOpt = getInternalSchemaOpt(internalSchemaOpt);
     readerContext.setNeedsBootstrapMerge(this.needsBootstrapMerge);
+    this.avroSchemaCache = AvroSchemaCache.getInstance();
   }
 
   public Schema getDataSchema() {
@@ -237,4 +244,24 @@ public class HoodieFileGroupReaderSchemaHandler<T> {
     }
     return createNewSchemaFromFieldsWithReference(dataSchema, fields);
   }
+
+  /**
+   * Encodes the given avro schema for efficient serialization.
+   */
+  public Integer encodeAvroSchema(Schema schema) {
+    return this.avroSchemaCache.cacheSchema(schema);
+  }
+
+  /**
+   * Decodes the avro schema with given version ID.
+   */
+  @Nullable
+  public Schema decodeAvroSchema(Object versionId) {
+    return this.avroSchemaCache.getSchema((Integer) versionId).orElse(null);
+  }
+
+  @Override
+  public void close() {
+    this.avroSchemaCache.close();
+  }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/AvroSchemaCache.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroSchemaCache.java
new file mode 100644
index 00000000000..538b8ed1d49
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/AvroSchemaCache.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.avro.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.NotThreadSafe;
+
+import java.io.Closeable;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * An avro schema cache implementation for managing different version of 
schemas.
+ * This is a local cache; the versionId only works for local thread in one 
container/executor.
+ * A map of {version_id, schema} is maintained.
+ */
+@NotThreadSafe
+public class AvroSchemaCache implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AvroSchemaCache.class);
+  private Map<Integer, Schema> versionIdToSchema; // the mapping from 
version_id -> schema
+  private Map<Schema, Integer> schemaToVersionId; // the mapping from schema 
-> version_id
+
+  private int nextVersionId = 0;
+
+  private AvroSchemaCache() {
+    this.versionIdToSchema = new HashMap<>();
+    this.schemaToVersionId = new HashMap<>();
+  }
+
+  public static AvroSchemaCache getInstance() {
+    return new AvroSchemaCache();
+  }
+
+  public Integer cacheSchema(Schema schema) {
+    Integer versionId = this.schemaToVersionId.get(schema);
+    if (versionId == null) {
+      versionId = nextVersionId++;
+      this.schemaToVersionId.put(schema, versionId);
+      this.versionIdToSchema.put(versionId, schema);
+    }
+    return versionId;
+  }
+
+  public Option<Schema> getSchema(Integer versionId) {
+    return Option.ofNullable(this.versionIdToSchema.get(versionId));
+  }
+
+  @Override
+  public void close() {
+    this.schemaToVersionId.clear();
+    this.versionIdToSchema.clear();
+  }
+}
diff --git 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
index 85b632707e8..ef40b8624f9 100644
--- 
a/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
+++ 
b/hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java
@@ -207,7 +207,7 @@ public class HiveHoodieReaderContext extends 
HoodieReaderContext<ArrayWritable>
     if (!recordOption.isPresent()) {
       return new HoodieEmptyRecord<>(new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH)), 
HoodieRecord.HoodieRecordType.HIVE);
     }
-    Schema schema = (Schema) metadataMap.get(INTERNAL_META_SCHEMA);
+    Schema schema = 
getSchemaHandler().decodeAvroSchema(metadataMap.get(INTERNAL_META_SCHEMA_ID));
     ArrayWritable writable = recordOption.get();
     return new HoodieHiveRecord(new HoodieKey((String) 
metadataMap.get(INTERNAL_META_RECORD_KEY), (String) 
metadataMap.get(INTERNAL_META_PARTITION_PATH)), writable, schema, 
objectInspectorCache);
   }
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
index 56738595c26..b2932d46e1b 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/SparkParquetReaderBase.scala
@@ -21,10 +21,9 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import org.apache.hudi.common.util
 import org.apache.hudi.internal.schema.InternalSchema
-
 import org.apache.hadoop.conf.Configuration
+import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.storage.StorageConfiguration
-
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.execution.datasources.PartitionedFile
 import org.apache.spark.sql.internal.SQLConf
@@ -64,6 +63,19 @@ abstract class 
SparkParquetReaderBase(enableVectorizedReader: Boolean,
     val conf = storageConf.unwrapCopy()
     conf.set(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA, 
requiredSchema.json)
     conf.set(ParquetWriteSupport.SPARK_ROW_SCHEMA, requiredSchema.json)
+
+    conf.setBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key, false);
+    conf.setBoolean(SQLConf.CASE_SENSITIVE.key, false);
+    // Sets flags for `ParquetToSparkSchemaConverter`
+    conf.setBoolean(SQLConf.PARQUET_BINARY_AS_STRING.key, false)
+    conf.setBoolean(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, true)
+    // Using string value of this conf to preserve compatibility across spark 
versions.
+    conf.setBoolean(SQLConf.LEGACY_PARQUET_NANOS_AS_LONG.key, false)
+    if (HoodieSparkUtils.gteqSpark3_4) {
+      // PARQUET_INFER_TIMESTAMP_NTZ_ENABLED is required from Spark 3.4.0 or 
above
+      conf.setBoolean("spark.sql.parquet.inferTimestampNTZ.enabled", false)
+    }
+
     ParquetWriteSupport.setSchema(requiredSchema, conf)
     doRead(file, requiredSchema, partitionSchema, internalSchemaOpt, filters, 
conf)
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
index 05d50e3c60d..b41ef4d16c5 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/ddl/TestSpark3DDL.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hudi.ddl
 
 import org.apache.hudi.{DataSourceWriteOptions, DefaultSparkRecordMerger, 
HoodieSparkUtils, QuickstartUtils}
-import org.apache.hudi.common.config.HoodieStorageConfig
+import org.apache.hudi.common.config.{HoodieReaderConfig, HoodieStorageConfig}
 import org.apache.hudi.common.model.HoodieRecord
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
 import org.apache.hudi.common.table.TableSchemaResolver
@@ -27,7 +27,6 @@ import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.hudi.index.inmemory.HoodieInMemoryHashIndex
 import org.apache.hudi.testutils.DataSourceTestUtils
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
-
 import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.{Row, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -670,7 +669,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
             .mode(SaveMode.Overwrite)
             .save(tablePath)
 
-          val oldView = 
spark.read.format("hudi").options(readOpt).load(tablePath)
+          val oldView = spark.read.format("hudi").options(readOpt)
+            .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),"false")
+            .load(tablePath)
           oldView.show(5, false)
 
           val records2 = 
RawTripTestPayload.recordsToStrings(dataGen.generateUpdatesAsPerSchema("002", 
100, schema)).asScala.toList
@@ -685,7 +686,9 @@ class TestSpark3DDL extends HoodieSparkSqlTestBase {
             .option("hoodie.datasource.write.reconcile.schema", "true")
             .mode(SaveMode.Append)
             .save(tablePath)
-          
spark.read.format("hudi").options(readOpt).load(tablePath).registerTempTable("newView")
+          spark.read.format("hudi").options(readOpt)
+            .option(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),"false")
+            .load(tablePath).registerTempTable("newView")
           val checkResult = spark.sql(s"select 
tip_history.amount,city_to_state,distance_in_meters,fare,height from newView 
where _row_key='$checkRowKey' ")
             .collect().map(row => (row.isNullAt(0), row.isNullAt(1), 
row.isNullAt(2), row.isNullAt(3), row.isNullAt(4)))
           assertResult((false, false, false, true, true))(checkResult(0))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
index 3d0074e913b..99c92843923 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/dml/TestPartialUpdateForMergeInto.scala
@@ -29,10 +29,9 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline
 import org.apache.hudi.common.table.view.{FileSystemViewManager, 
FileSystemViewStorageConfig, SyncableFileSystemView}
 import org.apache.hudi.common.testutils.HoodieTestUtils
 import org.apache.hudi.common.util.CompactionUtils
-import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
+import org.apache.hudi.config.{HoodieClusteringConfig, HoodieCompactionConfig, 
HoodieIndexConfig, HoodieWriteConfig}
 import org.apache.hudi.exception.HoodieNotSupportedException
 import org.apache.hudi.metadata.HoodieTableMetadata
-
 import org.apache.avro.Schema
 import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertTrue}
@@ -216,10 +215,10 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(
         s"""
            |merge into $tableName t0
-           |using ( select 1 as id, 'a1' as name, 12 as price, 1001 as ts
-           |union select 3 as id, 'a3' as name, 25 as price, 1260 as ts) s0
+           |using ( select 1 as id, 'a1' as name, 12.0 as price, 1001 as _ts
+           |union select 3 as id, 'a3' as name, 25.0 as price, 1260 as _ts) s0
            |on t0.id = s0.id
-           |when matched then update set price = s0.price, _ts = s0.ts
+           |when matched then update set price = s0.price, _ts = s0._ts
            |""".stripMargin)
 
       checkAnswer(s"select id, name, price, _ts, description from $tableName")(
@@ -236,10 +235,10 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
       spark.sql(
         s"""
            |merge into $tableName t0
-           |using ( select 1 as id, 'a1' as name, 'a1: updated desc1' as 
description, 1023 as ts
-           |union select 2 as id, 'a2' as name, 'a2: updated desc2' as 
description, 1270 as ts) s0
+           |using ( select 1 as id, 'a1' as name, 'a1: updated desc1' as 
description, 1023 as _ts
+           |union select 2 as id, 'a2' as name, 'a2: updated desc2' as 
description, 1270 as _ts) s0
            |on t0.id = s0.id
-           |when matched then update set description = s0.description, _ts = 
s0.ts
+           |when matched then update set description = s0.description, _ts = 
s0._ts
            |""".stripMargin)
 
       checkAnswer(s"select id, name, price, _ts, description from $tableName")(
@@ -256,10 +255,10 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
         spark.sql(
           s"""
              |merge into $tableName t0
-             |using ( select 2 as id, '_a2' as name, 18.0 as price, 1275 as ts
-             |union select 3 as id, '_a3' as name, 28.0 as price, 1280 as ts) 
s0
+             |using ( select 2 as id, '_a2' as name, 18.0 as price, 1275 as _ts
+             |union select 3 as id, '_a3' as name, 28.0 as price, 1280 as _ts) 
s0
              |on t0.id = s0.id
-             |when matched then update set price = s0.price, _ts = s0.ts
+             |when matched then update set price = s0.price, _ts = s0._ts
              |""".stripMargin)
         validateCompactionExecuted(basePath)
         checkAnswer(s"select id, name, price, _ts, description from 
$tableName")(
@@ -267,13 +266,40 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
           Seq(2, "a2", 18.0, 1275, "a2: updated desc2"),
           Seq(3, "a3", 28.0, 1280, "a3: desc3")
         )
+
+        // trigger one more MIT and do inline clustering
+        spark.sql(s"set ${HoodieClusteringConfig.INLINE_CLUSTERING.key} = 
true")
+        spark.sql(s"set 
${HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key} = 3")
+        spark.sql(
+          s"""
+             |merge into $tableName t0
+             |using ( select 2 as id, '_a2' as name, 48.0 as price, 1275 as _ts
+             |union select 3 as id, '_a3' as name, 58.0 as price, 1280 as _ts) 
s0
+             |on t0.id = s0.id
+             |when matched then update set price = s0.price, _ts = s0._ts
+             |""".stripMargin)
+
+        validateClusteringExecuted(basePath)
+        checkAnswer(s"select id, name, price, _ts, description from 
$tableName")(
+          Seq(1, "a1", 12.0, 1023, "a1: updated desc1"),
+          Seq(2, "a2", 48.0, 1275, "a2: updated desc2"),
+          Seq(3, "a3", 58.0, 1280, "a3: desc3")
+        )
+
+        // revert the config overrides.
         spark.sql(s"set 
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key}"
           + s" = 
${HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.defaultValue()}")
+
+        spark.sql(s"set 
${HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key}"
+          + s" = 
${HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.defaultValue()}")
+        spark.sql(s"set ${HoodieClusteringConfig.INLINE_CLUSTERING.key}"
+          + s" = ${HoodieClusteringConfig.INLINE_CLUSTERING.defaultValue()}")
       }
 
       if (tableType.equals("cow")) {
         // No preCombine field
         val tableName2 = generateTableName
+        val basePath2 = tmp.getCanonicalPath + "/" + tableName2
         spark.sql(
           s"""
              |create table $tableName2 (
@@ -285,21 +311,31 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
              | type ='$tableType',
              | primaryKey = 'id'
              |)
-             |location '${tmp.getCanonicalPath}/$tableName2'
+             |location '$basePath2'
         """.stripMargin)
         spark.sql(s"insert into $tableName2 values(1, 'a1', 10)")
 
+        spark.sql(s"set ${HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key} = 
true")
+        spark.sql(s"set ${HoodieClusteringConfig.INLINE_CLUSTERING.key} = 
true")
+        spark.sql(s"set 
${HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key} = 1")
+
         spark.sql(
           s"""
              |merge into $tableName2 t0
-             |using ( select 1 as id, 'a1' as name, 12 as price) s0
+             |using ( select 1 as id, 'a2' as name, 12.0 as price) s0
              |on t0.id = s0.id
              |when matched then update set price = s0.price
              |""".stripMargin)
 
+        validateClusteringExecuted(basePath2)
         checkAnswer(s"select id, name, price from $tableName2")(
           Seq(1, "a1", 12.0)
         )
+
+        spark.sql(s"set 
${HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.key}"
+          + s" = 
${HoodieClusteringConfig.INLINE_CLUSTERING_MAX_COMMITS.defaultValue()}")
+        spark.sql(s"set ${HoodieClusteringConfig.INLINE_CLUSTERING.key}"
+          + s" = ${HoodieClusteringConfig.INLINE_CLUSTERING.defaultValue()}")
       }
     }
   }
@@ -454,11 +490,19 @@ class TestPartialUpdateForMergeInto extends 
HoodieSparkSqlTestBase {
     }
   }
 
+  def validateClusteringExecuted(basePath: String): Unit = {
+    val storageConf = HoodieTestUtils.getDefaultStorageConf
+    val metaClient: HoodieTableMetaClient =
+      
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
+    val lastCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
+    assertEquals(HoodieTimeline.REPLACE_COMMIT_ACTION, lastCommit.getAction)
+  }
+
   def validateCompactionExecuted(basePath: String): Unit = {
     val storageConf = HoodieTestUtils.getDefaultStorageConf
     val metaClient: HoodieTableMetaClient =
       
HoodieTableMetaClient.builder.setConf(storageConf).setBasePath(basePath).build
-    val lastCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.lastInstant().get()
+    val lastCommit = 
metaClient.getActiveTimeline.getCommitsTimeline.filterCompletedInstants().lastInstant().get()
     assertEquals(HoodieTimeline.COMMIT_ACTION, lastCommit.getAction)
     CompactionUtils.getCompactionPlan(metaClient, lastCommit.requestedTime())
   }

Reply via email to