This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 46738d00bdf3 feat(schema): Migrate hudi spark client to use 
HoodieSchema (#17743)
46738d00bdf3 is described below

commit 46738d00bdf3c476e300247176cceb2900ddd871
Author: Rahil C <[email protected]>
AuthorDate: Tue Dec 30 21:13:56 2025 -0500

    feat(schema): Migrate hudi spark client to use HoodieSchema (#17743)
    
    * spark client changes so far
    
    * feat(schema): Migrate hudi spark client to use HoodieSchema
    
    * fix compilation issues
    
    * fix tests
    
    * address comments
    
    * fix merge conflict
---
 .../bootstrap/HoodieBootstrapSchemaProvider.java   | 12 ++++----
 .../HoodieSparkBootstrapSchemaProvider.java        | 16 +++++------
 .../MultipleSparkJobExecutionStrategy.java         |  6 ++--
 ...SparkJobConsistentHashingExecutionStrategy.java | 13 ++++-----
 ...onsistentBucketClusteringExecutionStrategy.java |  3 +-
 .../SparkSingleFileSortExecutionStrategy.java      |  5 ++--
 .../SparkSortAndSizeExecutionStrategy.java         |  5 ++--
 .../client/utils/SparkMetadataWriterUtils.java     | 10 +++----
 .../hudi/common/model/HoodieSparkRecord.java       | 33 ++++++++++++----------
 .../hudi/execution/SparkLazyInsertIterable.java    | 15 +++++-----
 .../bulkinsert/RDDBucketIndexPartitioner.java      | 11 ++++----
 .../bulkinsert/RDDSpatialCurveSortPartitioner.java |  2 +-
 .../io/storage/HoodieSparkFileWriterFactory.java   |  4 +--
 .../hudi/io/storage/HoodieSparkParquetReader.java  | 12 ++++----
 .../apache/hudi/merge/SparkRecordMergingUtils.java |  4 +--
 .../bootstrap/BaseBootstrapMetadataHandler.java    | 11 ++++----
 .../bootstrap/OrcBootstrapMetadataHandler.java     | 12 ++++----
 .../bootstrap/ParquetBootstrapMetadataHandler.java | 16 +++++------
 .../hudi/BaseSparkInternalRecordContext.java       | 13 ++++-----
 .../hudi/BaseSparkInternalRowReaderContext.java    |  2 +-
 .../hudi/HoodieDatasetBulkInsertHelper.scala       |  7 ++---
 .../scala/org/apache/hudi/HoodieSparkUtils.scala   |  5 ++--
 .../SparkFileFormatInternalRecordContext.scala     | 13 ++++-----
 .../SparkFileFormatInternalRowReaderContext.scala  |  4 +--
 .../apache/spark/sql/HoodieInternalRowUtils.scala  | 23 ++++++++-------
 .../callback/TestHoodieClientInitCallback.java     | 16 +++++------
 .../hudi/client/TestUpdateSchemaEvolution.java     |  5 ++--
 .../functional/TestExternalPathHandling.java       | 12 ++++----
 .../functional/TestHoodieBackedTableMetadata.java  |  7 ++---
 .../hudi/keygen/KeyGeneratorTestUtilities.java     |  8 +++---
 .../SparkClientFunctionalTestHarness.java          |  4 +--
 .../org/apache/hudi/HoodieCreateRecordUtils.scala  |  4 +--
 .../org/apache/hudi/HoodieSparkSqlWriter.scala     |  2 +-
 .../org/apache/hudi/cdc/CDCFileGroupIterator.scala |  2 +-
 .../hudi/functional/TestBufferedRecordMerger.java  |  2 +-
 .../read/TestHoodieFileGroupReaderOnSpark.scala    |  2 +-
 .../functional/TestParquetColumnProjection.scala   |  6 ++--
 .../hudi/common/TestHoodieInternalRowUtils.scala   |  8 +++---
 38 files changed, 161 insertions(+), 174 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
index ce4c42ba1665..34848df31591 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/bootstrap/HoodieBootstrapSchemaProvider.java
@@ -18,14 +18,12 @@
 
 package org.apache.hudi.client.bootstrap;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 
-import org.apache.avro.Schema;
-
 import java.util.List;
 
 /**
@@ -45,11 +43,11 @@ public abstract class HoodieBootstrapSchemaProvider {
    * @param partitions  List of partitions with files within them
    * @return Avro Schema
    */
-  public final Schema getBootstrapSchema(HoodieEngineContext context, 
List<Pair<String, List<HoodieFileStatus>>> partitions) {
+  public final HoodieSchema getBootstrapSchema(HoodieEngineContext context, 
List<Pair<String, List<HoodieFileStatus>>> partitions) {
     if (writeConfig.getSchema() != null) {
       // Use schema specified by user if set
-      Schema userSchema = new Schema.Parser().parse(writeConfig.getSchema());
-      if (!HoodieAvroUtils.getNullSchema().equals(userSchema)) {
+      HoodieSchema userSchema = HoodieSchema.parse(writeConfig.getSchema());
+      if (!HoodieSchema.NULL_SCHEMA.equals(userSchema)) {
         return userSchema;
       }
     }
@@ -63,7 +61,7 @@ public abstract class HoodieBootstrapSchemaProvider {
    * @param partitions  List of partitions with files within them
    * @return Avro Schema
    */
-  protected abstract Schema getBootstrapSourceSchema(HoodieEngineContext 
context,
+  protected abstract HoodieSchema getBootstrapSourceSchema(HoodieEngineContext 
context,
       List<Pair<String, List<HoodieFileStatus>>> partitions);
 
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
index b8aa55975a17..efdd945c2940 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/bootstrap/HoodieSparkBootstrapSchemaProvider.java
@@ -18,19 +18,19 @@
 
 package org.apache.hudi.client.bootstrap;
 
-import org.apache.hudi.AvroConversionUtils;
+import org.apache.hudi.HoodieSchemaConversionUtils;
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 
-import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.orc.OrcFile;
@@ -52,8 +52,8 @@ public class HoodieSparkBootstrapSchemaProvider extends 
HoodieBootstrapSchemaPro
   }
 
   @Override
-  protected Schema getBootstrapSourceSchema(HoodieEngineContext context, 
List<Pair<String, List<HoodieFileStatus>>> partitions) {
-    Schema schema = partitions.stream().flatMap(p -> 
p.getValue().stream()).map(fs -> {
+  protected HoodieSchema getBootstrapSourceSchema(HoodieEngineContext context, 
List<Pair<String, List<HoodieFileStatus>>> partitions) {
+    HoodieSchema schema = partitions.stream().flatMap(p -> 
p.getValue().stream()).map(fs -> {
           Path filePath = HadoopFSUtils.toPath(fs.getPath());
           String extension = FSUtils.getFileExtension(filePath.getName());
           if (PARQUET.getFileExtension().equals(extension)) {
@@ -69,7 +69,7 @@ public class HoodieSparkBootstrapSchemaProvider extends 
HoodieBootstrapSchemaPro
     return schema;
   }
 
-  private static Schema getBootstrapSourceSchemaParquet(HoodieWriteConfig 
writeConfig, HoodieEngineContext context, Path filePath) {
+  private static HoodieSchema 
getBootstrapSourceSchemaParquet(HoodieWriteConfig writeConfig, 
HoodieEngineContext context, Path filePath) {
     // NOTE: The type inference of partition column in the parquet table is 
turned off explicitly,
     // to be consistent with the existing bootstrap behavior, where the 
partition column is String
     // typed in Hudi table.
@@ -85,10 +85,10 @@ public class HoodieSparkBootstrapSchemaProvider extends 
HoodieBootstrapSchemaPro
     String structName = tableName + "_record";
     String recordNamespace = "hoodie." + tableName;
 
-    return AvroConversionUtils.convertStructTypeToAvroSchema(parquetSchema, 
structName, recordNamespace);
+    return 
HoodieSchemaConversionUtils.convertStructTypeToHoodieSchema(parquetSchema, 
structName, recordNamespace);
   }
 
-  private static Schema getBootstrapSourceSchemaOrc(HoodieWriteConfig 
writeConfig, HoodieEngineContext context, Path filePath) {
+  private static HoodieSchema getBootstrapSourceSchemaOrc(HoodieWriteConfig 
writeConfig, HoodieEngineContext context, Path filePath) {
     Reader orcReader = null;
     try {
       orcReader = OrcFile.createReader(filePath, 
OrcFile.readerOptions(context.getStorageConf().unwrapAs(Configuration.class)));
@@ -99,7 +99,7 @@ public class HoodieSparkBootstrapSchemaProvider extends 
HoodieBootstrapSchemaPro
     String tableName = 
HoodieAvroUtils.sanitizeName(writeConfig.getTableName());
     String structName = tableName + "_record";
     String recordNamespace = "hoodie." + tableName;
-    return AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema, 
structName, recordNamespace, true);
+    return 
HoodieSchema.fromAvroSchema(AvroOrcUtils.createAvroSchemaWithDefaultValue(orcSchema,
 structName, recordNamespace, true));
   }
   
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
index 2c409a50bf1b..f378ef46a5a3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/MultipleSparkJobExecutionStrategy.java
@@ -20,7 +20,6 @@ package org.apache.hudi.client.clustering.run.strategy;
 
 import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.avro.model.HoodieClusteringGroup;
 import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.SparkTaskContextSupplier;
@@ -65,7 +64,6 @@ import org.apache.hudi.table.action.HoodieWriteMetadata;
 import 
org.apache.hudi.table.action.cluster.strategy.ClusteringExecutionStrategy;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.FlatMapFunction;
@@ -172,7 +170,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
                                                                           
final int numOutputGroups,
                                                                           
final String instantTime,
                                                                           
final Map<String, String> strategyParams,
-                                                                          
final Schema schema,
+                                                                          
final HoodieSchema schema,
                                                                           
final List<HoodieFileGroupId> fileGroupIdList,
                                                                           
final boolean shouldPreserveHoodieMetadata,
                                                                           
final Map<String, String> extraMetadata);
@@ -230,7 +228,7 @@ public abstract class MultipleSparkJobExecutionStrategy<T>
     return CompletableFuture.supplyAsync(() -> {
       JavaSparkContext jsc = 
HoodieSparkEngineContext.getSparkContext(getEngineContext());
       HoodieData<HoodieRecord<T>> inputRecords = readRecordsForGroup(jsc, 
clusteringGroup, instantTime);
-      Schema readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(getWriteConfig().getSchema()));
+      HoodieSchema readerSchema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(getWriteConfig().getSchema()));
       // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
       //       payload pointing into a shared, mutable (underlying) buffer we 
get a clean copy of
       //       it since these records will be shuffled later.
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
index 5dccf0637b9f..960107d06547 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SingleSparkJobConsistentHashingExecutionStrategy.java
@@ -18,7 +18,6 @@
 
 package org.apache.hudi.client.clustering.run.strategy;
 
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.SparkTaskContextSupplier;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.LazyConcatenatingIterator;
@@ -32,6 +31,7 @@ import org.apache.hudi.common.model.ConsistentHashingNode;
 import org.apache.hudi.common.model.HoodieConsistentHashingMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.ValidationUtils;
@@ -49,7 +49,6 @@ import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucket
 import org.apache.hudi.util.ExecutorFactory;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 
 import java.util.ArrayList;
 import java.util.Collections;
@@ -71,13 +70,13 @@ import java.util.function.Supplier;
 public class SingleSparkJobConsistentHashingExecutionStrategy<T> extends 
SingleSparkJobExecutionStrategy<T> {
 
   private final String indexKeyFields;
-  private final Schema readerSchema;
+  private final HoodieSchema readerSchema;
 
   public SingleSparkJobConsistentHashingExecutionStrategy(HoodieTable table, 
HoodieEngineContext engineContext,
                                                           HoodieWriteConfig 
writeConfig) {
     super(table, engineContext, writeConfig);
     this.indexKeyFields = table.getConfig().getBucketIndexHashField();
-    this.readerSchema = HoodieAvroUtils.addMetadataFields(new 
Schema.Parser().parse(writeConfig.getSchema()));
+    this.readerSchema = 
HoodieSchemaUtils.addMetadataFields(HoodieSchema.parse(writeConfig.getSchema()));
   }
 
   @Override
@@ -147,10 +146,10 @@ public class 
SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS
     private final boolean recordsSorted;
     private final Map<String/*fileIdPrefix*/, HoodieWriteHandle> writeHandles;
     private final Function<HoodieRecord, String> fileIdPrefixExtractor;
-    private final Schema schema;
+    private final HoodieSchema schema;
 
     public InsertHandler(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable, TaskContextSupplier taskContextSupplier,
-                         WriteHandleFactory writeHandleFactory, boolean 
recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor, Schema 
schema) {
+                         WriteHandleFactory writeHandleFactory, boolean 
recordsSorted, Function<HoodieRecord, String> fileIdPrefixExtractor, 
HoodieSchema schema) {
       this.config = config;
       this.instantTime = instantTime;
       this.hoodieTable = hoodieTable;
@@ -175,7 +174,7 @@ public class 
SingleSparkJobConsistentHashingExecutionStrategy<T> extends SingleS
         handle = writeHandleFactory.create(config, instantTime, hoodieTable, 
record.getPartitionPath(), fileIdPrefix, taskContextSupplier);
         writeHandles.put(fileIdPrefix, handle);
       }
-      handle.write(record, HoodieSchema.fromAvroSchema(schema), 
config.getProps());
+      handle.write(record, schema, config.getProps());
     }
 
     @Override
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
index 3b63e84bda07..379f2cb4b858 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkConsistentBucketClusteringExecutionStrategy.java
@@ -37,7 +37,6 @@ import 
org.apache.hudi.table.action.cluster.strategy.BaseConsistentHashingBucket
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -84,7 +83,7 @@ public class 
SparkConsistentBucketClusteringExecutionStrategy<T extends HoodieRe
 
   @Override
   public HoodieData<WriteStatus> 
performClusteringWithRecordsRDD(HoodieData<HoodieRecord<T>> inputRecords, int 
numOutputGroups, String instantTime,
-                                                                 Map<String, 
String> strategyParams, Schema schema, List<HoodieFileGroupId> fileGroupIdList,
+                                                                 Map<String, 
String> strategyParams, HoodieSchema schema, List<HoodieFileGroupId> 
fileGroupIdList,
                                                                  boolean 
preserveHoodieMetadata, Map<String, String> extraMetadata) {
 
     log.info("Starting clustering for a group, parallelism:{} commit:{}", 
numOutputGroups, instantTime);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
index 6f201e5b4c70..ea778ce4fc90 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSingleFileSortExecutionStrategy.java
@@ -35,7 +35,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -89,7 +88,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
                                                                  int 
numOutputGroups,
                                                                  String 
instantTime,
                                                                  Map<String, 
String> strategyParams,
-                                                                 Schema schema,
+                                                                 HoodieSchema 
schema,
                                                                  
List<HoodieFileGroupId> fileGroupIdList,
                                                                  boolean 
shouldPreserveHoodieMetadata,
                                                                  Map<String, 
String> extraMetadata) {
@@ -105,7 +104,7 @@ public class SparkSingleFileSortExecutionStrategy<T>
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(Long.MAX_VALUE));
 
     return (HoodieData<WriteStatus>) 
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, 
getHoodieTable(), newConfig,
-        false, getRDDPartitioner(strategyParams, 
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups,
+        false, getRDDPartitioner(strategyParams, schema), true, 
numOutputGroups,
         new SingleFileHandleCreateFactory(fileGroupIdList.get(0).getFileId(), 
shouldPreserveHoodieMetadata));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
index d66691d77fdd..05d9b23a8934 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/clustering/run/strategy/SparkSortAndSizeExecutionStrategy.java
@@ -33,7 +33,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.commit.SparkBulkInsertHelper;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -82,7 +81,7 @@ public class SparkSortAndSizeExecutionStrategy<T>
                                                                  final int 
numOutputGroups,
                                                                  final String 
instantTime,
                                                                  final 
Map<String, String> strategyParams,
-                                                                 final Schema 
schema,
+                                                                 final 
HoodieSchema schema,
                                                                  final 
List<HoodieFileGroupId> fileGroupIdList,
                                                                  final boolean 
shouldPreserveHoodieMetadata,
                                                                  final 
Map<String, String> extraMetadata) {
@@ -95,6 +94,6 @@ public class SparkSortAndSizeExecutionStrategy<T>
     newConfig.setValue(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE, 
String.valueOf(getWriteConfig().getClusteringTargetFileMaxBytes()));
 
     return (HoodieData<WriteStatus>) 
SparkBulkInsertHelper.newInstance().bulkInsert(inputRecords, instantTime, 
getHoodieTable(),
-        newConfig, false, getRDDPartitioner(strategyParams, 
HoodieSchema.fromAvroSchema(schema)), true, numOutputGroups, new 
CreateHandleFactory(shouldPreserveHoodieMetadata));
+        newConfig, false, getRDDPartitioner(strategyParams, schema), true, 
numOutputGroups, new CreateHandleFactory(shouldPreserveHoodieMetadata));
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
index 9b1f36908b59..b86fa9612d1e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkMetadataWriterUtils.java
@@ -72,7 +72,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.JavaScalaConverters;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.function.FlatMapGroupsFunction;
 import org.apache.spark.api.java.function.MapFunction;
 import org.apache.spark.sql.Column;
@@ -104,7 +103,6 @@ import java.util.stream.Stream;
 
 import scala.collection.immutable.Seq;
 
-import static org.apache.hudi.avro.HoodieAvroUtils.addMetadataFields;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
@@ -351,7 +349,7 @@ public class SparkMetadataWriterUtils {
         .build();
     try {
       ClosableIterator<InternalRow> rowsForFilePath = 
fileGroupReader.getClosableIterator();
-      SparkRowSerDe sparkRowSerDe = 
HoodieSparkUtils.getCatalystRowSerDe(HoodieInternalRowUtils.getCachedSchema(readerSchema.toAvroSchema()));
+      SparkRowSerDe sparkRowSerDe = 
HoodieSparkUtils.getCatalystRowSerDe(HoodieInternalRowUtils.getCachedSchema(readerSchema));
       return getRowsWithExpressionIndexMetadata(rowsForFilePath, 
sparkRowSerDe, partition, relativeFilePath, fileSize);
     } catch (IOException ex) {
       throw new HoodieIOException("Error reading file " + filePath, ex);
@@ -385,14 +383,14 @@ public class SparkMetadataWriterUtils {
     HoodieIndexDefinition indexDefinition = 
HoodieTableMetadataUtil.getHoodieIndexDefinition(indexPartition, 
dataMetaClient);
     List<String> columnsToIndex = 
Collections.singletonList(indexDefinition.getSourceFields().get(0));
     try {
-      Option<Schema> writerSchema =
+      Option<HoodieSchema> writerSchema =
           
Option.ofNullable(commitMetadata.getMetadata(HoodieCommitMetadata.SCHEMA_KEY))
               .flatMap(writerSchemaStr ->
                   isNullOrEmpty(writerSchemaStr)
                       ? Option.empty()
-                      : Option.of(new Schema.Parser().parse(writerSchemaStr)));
+                      : Option.of(HoodieSchema.parse(writerSchemaStr)));
       HoodieTableConfig tableConfig = dataMetaClient.getTableConfig();
-      HoodieSchema tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? addMetadataFields(schema) : 
schema).map(HoodieSchema::fromAvroSchema)
+      HoodieSchema tableSchema = writerSchema.map(schema -> 
tableConfig.populateMetaFields() ? HoodieSchemaUtils.addMetadataFields(schema) 
: schema)
           .orElseThrow(() -> new IllegalStateException(String.format("Expected 
writer schema in commit metadata %s", commitMetadata)));
       List<Pair<String, HoodieSchema>> columnsToIndexSchemaMap = 
columnsToIndex.stream()
           .map(columnToIndex -> HoodieSchemaUtils.getNestedField(tableSchema, 
columnToIndex))
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
index 9606f255eef0..6747e1512a38 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/common/model/HoodieSparkRecord.java
@@ -22,6 +22,7 @@ import org.apache.hudi.AvroConversionUtils;
 import org.apache.hudi.SparkAdapterSupport$;
 import org.apache.hudi.SparkFileFormatInternalRecordContext;
 import org.apache.hudi.client.model.HoodieInternalRow;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.read.DeleteContext;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.OrderingValues;
@@ -171,7 +172,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     if (key != null) {
       return getRecordKey();
     }
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
     return keyGeneratorOpt.isPresent()
         ? ((SparkKeyGeneratorInterface) 
keyGeneratorOpt.get()).getRecordKey(data, structType).toString()
         : 
data.getString(HoodieMetadataField.RECORD_KEY_METADATA_FIELD.ordinal());
@@ -182,7 +183,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
     if (key != null) {
       return getRecordKey();
     }
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
     DataType dataType = structType.apply(keyFieldName).dataType();
     int pos = structType.fieldIndex(keyFieldName);
     return data.get(pos, dataType).toString();
@@ -195,7 +196,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   public Object[] getColumnValues(Schema recordSchema, String[] columns, 
boolean consistentLogicalTimestampEnabled) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
     Object[] objects = new Object[columns.length];
     for (int i = 0; i < objects.length; i++) {
       objects[i] = getValue(structType, columns[i], data);
@@ -205,12 +206,12 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   public Object getColumnValueAsJava(Schema recordSchema, String column, 
Properties props) {
-    return getFieldValueFromInternalRowAsJava(data, recordSchema, column);
+    return getFieldValueFromInternalRowAsJava(data, 
HoodieSchema.fromAvroSchema(recordSchema), column);
   }
 
   @Override
   public HoodieRecord joinWith(HoodieRecord other, Schema targetSchema) {
-    StructType targetStructType = 
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    StructType targetStructType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(targetSchema));
     InternalRow mergeRow = new JoinedRow(data, (InternalRow) other.getData());
     UnsafeProjection projection =
         getCachedUnsafeProjection(targetStructType, targetStructType);
@@ -219,8 +220,8 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   public HoodieRecord prependMetaFields(Schema recordSchema, Schema 
targetSchema, MetadataValues metadataValues, Properties props) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
-    StructType targetStructType = 
HoodieInternalRowUtils.getCachedSchema(targetSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
+    StructType targetStructType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(targetSchema));
 
     HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data, 
structType);
     updateMetadataValuesInternal(updatableRow, metadataValues);
@@ -230,7 +231,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   public HoodieRecord updateMetaField(Schema recordSchema, int ordinal, String 
value) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
     HoodieInternalRow updatableRow = wrapIntoUpdatableOverlay(this.data, 
structType);
     updatableRow.update(ordinal, 
CatalystTypeConverters.convertToCatalyst(value));
     return new HoodieSparkRecord(getKey(), updatableRow, structType, 
getOperation(), this.currentLocation, this.newLocation, false);
@@ -238,8 +239,8 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   public HoodieRecord rewriteRecordWithNewSchema(Schema recordSchema, 
Properties props, Schema newSchema, Map<String, String> renameCols) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
-    StructType newStructType = 
HoodieInternalRowUtils.getCachedSchema(newSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
+    StructType newStructType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(newSchema));
 
     Function1<InternalRow, UnsafeRow> unsafeRowWriter =
         HoodieInternalRowUtils.getCachedUnsafeRowWriter(structType, 
newStructType, renameCols, Collections.emptyMap());
@@ -251,7 +252,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   public HoodieRecord truncateRecordKey(Schema recordSchema, Properties props, 
String keyFieldName) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
     int pos = structType.fieldIndex(keyFieldName);
     data.update(pos, 
CatalystTypeConverters.convertToCatalyst(StringUtils.EMPTY_STRING));
     return this;
@@ -279,8 +280,10 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
       Option<String> partitionNameOp,
       Boolean populateMetaFields,
       Option<Schema> schemaWithoutMetaFields) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
-    Option<StructType> structTypeWithoutMetaFields = 
schemaWithoutMetaFields.map(HoodieInternalRowUtils::getCachedSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
+    Option<StructType> structTypeWithoutMetaFields = schemaWithoutMetaFields
+        .map(HoodieSchema::fromAvroSchema)
+        .map(HoodieInternalRowUtils::getCachedSchema);
     if (populateMetaFields) {
       return convertToHoodieSparkRecord(structType, this, withOperation);
     } else if (simpleKeyGenFieldsOpt.isPresent()) {
@@ -292,7 +295,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   public HoodieRecord wrapIntoHoodieRecordPayloadWithKeyGen(Schema 
recordSchema, Properties props, Option<BaseKeyGenerator> keyGen) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
     String key;
     String partition;
     boolean populateMetaFields = 
Boolean.parseBoolean(props.getOrDefault(POPULATE_META_FIELDS.key(),
@@ -341,7 +344,7 @@ public class HoodieSparkRecord extends 
HoodieRecord<InternalRow> {
 
   @Override
   protected Comparable<?> doGetOrderingValue(Schema recordSchema, Properties 
props, String[] orderingFields) {
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(recordSchema);
+    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(recordSchema));
     if (orderingFields != null) {
       return OrderingValues.create(orderingFields, field -> {
         scala.Option<NestedFieldPath> cachedNestedFieldPath =
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
index 45e29a18110d..29370697ddc8 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/SparkLazyInsertIterable.java
@@ -18,11 +18,12 @@
 
 package org.apache.hudi.execution;
 
-import org.apache.hudi.avro.AvroSchemaCache;
-import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -30,8 +31,6 @@ import org.apache.hudi.io.WriteHandleFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.ExecutorFactory;
 
-import org.apache.avro.Schema;
-
 import java.util.Iterator;
 import java.util.List;
 
@@ -69,14 +68,14 @@ public class SparkLazyInsertIterable<T> extends 
HoodieLazyInsertIterable<T> {
     HoodieExecutor<List<WriteStatus>> bufferedIteratorExecutor = null;
     try {
       // config.getSchema is not canonicalized, while config.getWriteSchema is 
canonicalized. So, we have to use the canonicalized schema to read the existing 
data.
-      Schema schema = new Schema.Parser().parse(hoodieConfig.getWriteSchema());
+      HoodieSchema schema = HoodieSchema.parse(hoodieConfig.getWriteSchema());
       if (useWriterSchema) {
-        schema = HoodieAvroUtils.addMetadataFields(schema);
+        schema = HoodieSchemaUtils.addMetadataFields(schema);
       }
-      schema = AvroSchemaCache.intern(schema);
+      schema = HoodieSchemaCache.intern(schema);
 
       bufferedIteratorExecutor = ExecutorFactory.create(hoodieConfig, 
inputItr, getInsertHandler(),
-          getTransformer(schema, hoodieConfig), 
hoodieTable.getPreExecuteRunnable());
+          getTransformer(schema.getAvroSchema(), hoodieConfig), 
hoodieTable.getPreExecuteRunnable());
 
       final List<WriteStatus> result = bufferedIteratorExecutor.execute();
       return result;
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
index fdbfae12ce32..f8f34d188c8b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDBucketIndexPartitioner.java
@@ -19,15 +19,14 @@
 package org.apache.hudi.execution.bulkinsert;
 
 import org.apache.hudi.SparkAdapterSupport$;
-import org.apache.hudi.avro.HoodieAvroUtils;
-import org.apache.hudi.common.config.SerializableSchema;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.util.collection.FlatLists;
 import org.apache.hudi.table.BucketIndexBulkInsertPartitioner;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.Schema;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -87,12 +86,12 @@ public abstract class RDDBucketIndexPartitioner<T> extends 
BucketIndexBulkInsert
    */
   private JavaRDD<HoodieRecord<T>> 
doPartitionAndCustomColumnSort(JavaRDD<HoodieRecord<T>> records, Partitioner 
partitioner) {
     final String[] sortColumns = sortColumnNames;
-    final SerializableSchema schema = new 
SerializableSchema(HoodieAvroUtils.addMetadataFields((new 
Schema.Parser().parse(table.getConfig().getSchema()))));
+    final HoodieSchema schema = 
HoodieSchemaUtils.addMetadataFields((HoodieSchema.parse(table.getConfig().getSchema())));
     Comparator<HoodieRecord<T>> comparator = (Comparator<HoodieRecord<T>> & 
Serializable) (t1, t2) -> {
       FlatLists.ComparableList obj1 = 
FlatLists.ofComparableArray(utf8StringFactory.wrapArrayOfObjects(
-          t1.getColumnValues(schema.get(), sortColumns, 
consistentLogicalTimestampEnabled)));
+          t1.getColumnValues(schema.getAvroSchema(), sortColumns, 
consistentLogicalTimestampEnabled)));
       FlatLists.ComparableList obj2 = 
FlatLists.ofComparableArray(utf8StringFactory.wrapArrayOfObjects(
-          t2.getColumnValues(schema.get(), sortColumns, 
consistentLogicalTimestampEnabled)));
+          t2.getColumnValues(schema.getAvroSchema(), sortColumns, 
consistentLogicalTimestampEnabled)));
       return obj1.compareTo(obj2);
     };
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
index 327eb8919529..b85c5f52a0b0 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/execution/bulkinsert/RDDSpatialCurveSortPartitioner.java
@@ -89,7 +89,7 @@ public class RDDSpatialCurveSortPartitioner<T>
             return hoodieRecord;
           });
     } else if (recordType == HoodieRecordType.SPARK) {
-      StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
+      StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
       Dataset<Row> sourceDataset = 
SparkConversionUtils.createDataFrame(records.rdd(),
           sparkEngineContext.getSqlContext().sparkSession(), structType);
       Dataset<Row> sortedDataset = reorder(sourceDataset, 
outputSparkPartitions);
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index 4b5264ab759f..c251eae33e2e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -109,7 +109,7 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
   protected HoodieFileWriter newLanceFileWriter(String instantTime, 
StoragePath path, HoodieConfig config, HoodieSchema schema,
                                                 TaskContextSupplier 
taskContextSupplier) throws IOException {
     boolean populateMetaFields = 
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.getAvroSchema());
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
 
     return new HoodieSparkLanceWriter(path, structType, instantTime, 
taskContextSupplier, storage, populateMetaFields);
   }
@@ -117,7 +117,7 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
   private static HoodieRowParquetWriteSupport 
getHoodieRowParquetWriteSupport(StorageConfiguration<?> conf, HoodieSchema 
schema,
                                                                               
HoodieConfig config, boolean enableBloomFilter) {
     Option<BloomFilter> filter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
-    StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.getAvroSchema());
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
     return 
HoodieRowParquetWriteSupport.getHoodieRowParquetWriteSupport(conf.unwrapAs(Configuration.class),
 structType, filter, config);
   }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
index af1e7a8c45f0..553d4c79f6cf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkParquetReader.java
@@ -36,7 +36,6 @@ import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 
-import org.apache.avro.Schema;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.filter2.predicate.FilterApi;
@@ -46,7 +45,7 @@ import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.schema.MessageType;
 import org.apache.parquet.schema.SchemaRepair;
 import org.apache.spark.sql.HoodieInternalRowUtils;
-import org.apache.spark.sql.avro.HoodieSparkAvroSchemaConverters;
+import org.apache.spark.sql.avro.HoodieSparkSchemaConverters;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -82,7 +81,7 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
   private final List<ClosableIterator> readerIterators = new ArrayList<>();
   private Option<MessageType> fileSchemaOption = Option.empty();
   private Option<StructType> structTypeOption = Option.empty();
-  private Option<Schema> schemaOption = Option.empty();
+  private Option<HoodieSchema> schemaOption = Option.empty();
 
   public HoodieSparkParquetReader(HoodieStorage storage, StoragePath path) {
     this.path = path;
@@ -141,7 +140,7 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
    */
   public ClosableIterator<UnsafeRow> getUnsafeRowIterator(HoodieSchema 
requestedSchema, List<Filter> readFilters) throws IOException {
     HoodieSchema nonNullSchema = 
HoodieSchemaUtils.getNonNullTypeFromUnion(requestedSchema);
-    StructType structSchema = 
HoodieInternalRowUtils.getCachedSchema(nonNullSchema.toAvroSchema());
+    StructType structSchema = 
HoodieInternalRowUtils.getCachedSchema(nonNullSchema);
     Option<MessageType> messageSchema = 
Option.of(getAvroSchemaConverter(storage.getConf().unwrapAs(Configuration.class)).convert(nonNullSchema.toAvroSchema()));
     boolean enableTimestampFieldRepair = 
storage.getConf().getBoolean(ENABLE_LOGICAL_TIMESTAMP_REPAIR, true);
     StructType dataStructType = convertToStruct(enableTimestampFieldRepair ? 
SchemaRepair.repairLogicalTypes(getFileSchema(), messageSchema) : 
getFileSchema());
@@ -204,11 +203,10 @@ public class HoodieSparkParquetReader implements 
HoodieSparkFileReader {
       // and therefore if we convert to Avro directly we'll lose logical 
type-info.
       MessageType messageType = getFileSchema();
       StructType structType = getStructSchema();
-      schemaOption = Option.of(HoodieSparkAvroSchemaConverters.toAvroType(
+      schemaOption = Option.of(HoodieSparkSchemaConverters.toHoodieType(
           structType, true, messageType.getName(), StringUtils.EMPTY_STRING));
     }
-    //TODO boundary to revisit using HoodieSchema directly
-    return HoodieSchema.fromAvroSchema(schemaOption.get());
+    return schemaOption.get();
   }
 
   protected StructType getStructSchema() {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
index 9dd1f5c71fc0..a838e1378d63 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/merge/SparkRecordMergingUtils.java
@@ -145,7 +145,7 @@ public class SparkRecordMergingUtils {
    */
   public static Map<Integer, StructField> 
getCachedFieldIdToFieldMapping(HoodieSchema providedSchema) {
     return FIELD_ID_TO_FIELD_MAPPING_CACHE.computeIfAbsent(providedSchema, 
schema -> {
-      StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
+      StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
       Map<Integer, StructField> schemaFieldIdMapping = new HashMap<>();
       int fieldId = 0;
 
@@ -164,7 +164,7 @@ public class SparkRecordMergingUtils {
    */
   public static Map<String, Integer> 
getCachedFieldNameToIdMapping(HoodieSchema providedSchema) {
     return FIELD_NAME_TO_ID_MAPPING_CACHE.computeIfAbsent(providedSchema, 
schema -> {
-      StructType structType = 
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema());
+      StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
       Map<String, Integer> schemaFieldIdMapping = new HashMap<>();
       int fieldId = 0;
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
index a36111c83419..368c04aa89c9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/BaseBootstrapMetadataHandler.java
@@ -24,6 +24,8 @@ import org.apache.hudi.avro.model.HoodiePath;
 import org.apache.hudi.client.bootstrap.BootstrapWriteStatus;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.BootstrapFileMapping;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -32,7 +34,6 @@ import org.apache.hudi.keygen.KeyGeneratorInterface;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
-import org.apache.avro.Schema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -58,11 +59,11 @@ public abstract class BaseBootstrapMetadataHandler 
implements BootstrapMetadataH
     HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle = new 
HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS,
         table, partitionPath, FSUtils.createNewFileIdPfx(), 
table.getTaskContextSupplier());
     try {
-      Schema avroSchema = getAvroSchema(sourceFilePath);
+      HoodieSchema schema = getSchema(sourceFilePath);
       List<String> recordKeyColumns = 
keyGenerator.getRecordKeyFieldNames().stream()
           .map(HoodieAvroUtils::getRootLevelFieldName)
           .collect(Collectors.toList());
-      Schema recordKeySchema = 
HoodieAvroUtils.generateProjectionSchema(avroSchema, recordKeyColumns);
+      HoodieSchema recordKeySchema = 
HoodieSchemaUtils.generateProjectionSchema(schema, recordKeyColumns);
 
       LOG.info("Schema to be used for reading record keys: " + 
recordKeySchema);
 
@@ -79,8 +80,8 @@ public abstract class BaseBootstrapMetadataHandler implements 
BootstrapMetadataH
     return writeStatus;
   }
 
-  abstract Schema getAvroSchema(StoragePath sourceFilePath) throws IOException;
+  abstract HoodieSchema getSchema(StoragePath sourceFilePath) throws 
IOException;
 
   abstract void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> 
bootstrapHandle,
-                                 StoragePath sourceFilePath, 
KeyGeneratorInterface keyGenerator, String partitionPath, Schema avroSchema) 
throws Exception;
+                                 StoragePath sourceFilePath, 
KeyGeneratorInterface keyGenerator, String partitionPath, HoodieSchema schema) 
throws Exception;
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
index 1a02941c2daa..9e6475a090cb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/OrcBootstrapMetadataHandler.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.AvroOrcUtils;
 import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -57,28 +58,29 @@ class OrcBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
   }
 
   @Override
-  Schema getAvroSchema(StoragePath sourceFilePath) throws IOException {
+  HoodieSchema getSchema(StoragePath sourceFilePath) throws IOException {
     Reader orcReader = OrcFile.createReader(
         new Path(sourceFilePath.toUri()), 
OrcFile.readerOptions((Configuration) table.getStorageConf().unwrap()));
     TypeDescription orcSchema = orcReader.getSchema();
-    return AvroOrcUtils.createAvroSchema(orcSchema);
+    Schema schema = AvroOrcUtils.createAvroSchema(orcSchema);
+    return HoodieSchema.fromAvroSchema(schema);
   }
 
   @Override
   void executeBootstrap(HoodieBootstrapHandle<?, ?, ?, ?> bootstrapHandle,
                         StoragePath sourceFilePath, KeyGeneratorInterface 
keyGenerator,
-                        String partitionPath, Schema avroSchema) throws 
Exception {
+                        String partitionPath, HoodieSchema schema) throws 
Exception {
     // TODO support spark orc reader
     if (config.getRecordMerger().getRecordType() == HoodieRecordType.SPARK) {
       throw new UnsupportedOperationException();
     }
     Reader orcReader = OrcFile.createReader(
         new Path(sourceFilePath.toUri()), 
OrcFile.readerOptions((Configuration) table.getStorageConf().unwrap()));
-    TypeDescription orcSchema = AvroOrcUtils.createOrcSchema(avroSchema);
+    TypeDescription orcSchema = 
AvroOrcUtils.createOrcSchema(schema.getAvroSchema());
     HoodieExecutor<Void> executor = null;
     RecordReader reader = orcReader.rows(new Reader.Options((Configuration) 
table.getStorageConf().unwrap()).schema(orcSchema));
     try {
-      executor = ExecutorFactory.create(config, new 
OrcReaderIterator<GenericRecord>(reader, avroSchema, orcSchema),
+      executor = ExecutorFactory.create(config, new 
OrcReaderIterator<GenericRecord>(reader, schema.getAvroSchema(), orcSchema),
           new BootstrapRecordConsumer(bootstrapHandle), inp -> {
             String recKey = keyGenerator.getKey(inp).getRecordKey();
             GenericRecord gr = new 
GenericData.Record(METADATA_BOOTSTRAP_RECORD_SCHEMA.toAvroSchema());
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
index 5414305104fd..d0c75bd8ae8e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/ParquetBootstrapMetadataHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.table.action.bootstrap;
 
+import org.apache.avro.Schema;
 import org.apache.hudi.avro.model.HoodieFileStatus;
 import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
 import org.apache.hudi.common.model.HoodieKey;
@@ -36,7 +37,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.util.ExecutorFactory;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -66,12 +66,13 @@ class ParquetBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
   }
 
   @Override
-  Schema getAvroSchema(StoragePath sourceFilePath) throws IOException {
+  HoodieSchema getSchema(StoragePath sourceFilePath) throws IOException {
     ParquetMetadata readFooter = ParquetFileReader.readFooter(
         (Configuration) table.getStorageConf().unwrap(), new 
Path(sourceFilePath.toUri()),
         ParquetMetadataConverter.NO_FILTER);
     MessageType parquetSchema = readFooter.getFileMetaData().getSchema();
-    return getAvroSchemaConverter((Configuration) 
table.getStorageConf().unwrap()).convert(parquetSchema);
+    Schema schema = getAvroSchemaConverter((Configuration) 
table.getStorageConf().unwrap()).convert(parquetSchema);
+    return HoodieSchema.fromAvroSchema(schema);
   }
 
   @Override
@@ -79,7 +80,7 @@ class ParquetBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
                                   StoragePath sourceFilePath,
                                   KeyGeneratorInterface keyGenerator,
                                   String partitionPath,
-                                  Schema schema) throws Exception {
+                                  HoodieSchema schema) throws Exception {
     HoodieRecord.HoodieRecordType recordType = 
table.getConfig().getRecordMerger().getRecordType();
 
     HoodieFileReader reader = 
getHoodieSparkIOFactory(table.getStorage()).getReaderFactory(recordType)
@@ -88,15 +89,14 @@ class ParquetBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
     HoodieExecutor<Void> executor = null;
     try {
       Function<HoodieRecord, HoodieRecord> transformer = record -> {
-        String recordKey = record.getRecordKey(schema, 
Option.of(keyGenerator));
+        String recordKey = record.getRecordKey(schema.getAvroSchema(), 
Option.of(keyGenerator));
         return createNewMetadataBootstrapRecord(recordKey, partitionPath, 
recordType)
             // NOTE: Record have to be cloned here to make sure if it holds 
low-level engine-specific
             //       payload pointing into a shared, mutable (underlying) 
buffer we get a clean copy of
             //       it since these records will be inserted into the queue 
later.
             .copy();
       };
-      //TODO boundary to reivisit in later pr to use HoodieSchema directly
-      ClosableIterator<HoodieRecord> recordIterator = 
reader.getRecordIterator(HoodieSchema.fromAvroSchema(schema));
+      ClosableIterator<HoodieRecord> recordIterator = 
reader.getRecordIterator(schema);
       executor = ExecutorFactory.create(config, recordIterator,
           new BootstrapRecordConsumer(bootstrapHandle), transformer, 
table.getPreExecuteRunnable());
       executor.execute();
@@ -124,7 +124,7 @@ class ParquetBootstrapMetadataHandler extends 
BaseBootstrapMetadataHandler {
         return new HoodieAvroIndexedRecord(hoodieKey, avroRecord);
 
       case SPARK:
-        StructType schema = 
HoodieInternalRowUtils$.MODULE$.getCachedSchema(METADATA_BOOTSTRAP_RECORD_SCHEMA.toAvroSchema());
+        StructType schema = 
HoodieInternalRowUtils$.MODULE$.getCachedSchema(METADATA_BOOTSTRAP_RECORD_SCHEMA);
         UnsafeProjection unsafeProjection = 
HoodieInternalRowUtils$.MODULE$.getCachedUnsafeProjection(schema, schema);
 
         GenericInternalRow row = new 
GenericInternalRow(METADATA_BOOTSTRAP_RECORD_SCHEMA.getFields().size());
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
index 1af96a2bf335..6bd5420af014 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRecordContext.java
@@ -31,7 +31,6 @@ import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.util.DefaultJavaTypeConverter;
 import org.apache.hudi.util.OrderingValueEngineTypeConverter;
 
-import org.apache.avro.Schema;
 import org.apache.spark.sql.HoodieInternalRowUtils;
 import org.apache.spark.sql.HoodieUnsafeRowUtils;
 import org.apache.spark.sql.catalyst.InternalRow;
@@ -65,15 +64,15 @@ public abstract class BaseSparkInternalRecordContext 
extends RecordContext<Inter
     super(new DefaultJavaTypeConverter());
   }
 
-  public static Object getFieldValueFromInternalRow(InternalRow row, Schema 
recordSchema, String fieldName) {
+  public static Object getFieldValueFromInternalRow(InternalRow row, 
HoodieSchema recordSchema, String fieldName) {
     return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName, 
false);
   }
 
-  public static Object getFieldValueFromInternalRowAsJava(InternalRow row, 
Schema recordSchema, String fieldName) {
+  public static Object getFieldValueFromInternalRowAsJava(InternalRow row, 
HoodieSchema recordSchema, String fieldName) {
     return getFieldValueFromInternalRowInternal(row, recordSchema, fieldName, 
true);
   }
 
-  private static Object getFieldValueFromInternalRowInternal(InternalRow row, 
Schema recordSchema, String fieldName, boolean convertToJavaType) {
+  private static Object getFieldValueFromInternalRowInternal(InternalRow row, 
HoodieSchema recordSchema, String fieldName, boolean convertToJavaType) {
     StructType structType = getCachedSchema(recordSchema);
     scala.Option<HoodieUnsafeRowUtils.NestedFieldPath> cachedNestedFieldPath =
         HoodieInternalRowUtils.getCachedPosList(structType, fieldName);
@@ -107,7 +106,7 @@ public abstract class BaseSparkInternalRecordContext 
extends RecordContext<Inter
 
   @Override
   public Object getValue(InternalRow row, HoodieSchema schema, String 
fieldName) {
-    return getFieldValueFromInternalRow(row, schema.toAvroSchema(), fieldName);
+    return getFieldValueFromInternalRow(row, schema, fieldName);
   }
 
   @Override
@@ -128,7 +127,7 @@ public abstract class BaseSparkInternalRecordContext 
extends RecordContext<Inter
 
     HoodieSchema schema = getSchemaFromBufferRecord(bufferedRecord);
     InternalRow row = bufferedRecord.getRecord();
-    return new HoodieSparkRecord(hoodieKey, row, 
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema()),
+    return new HoodieSparkRecord(hoodieKey, row, 
HoodieInternalRowUtils.getCachedSchema(schema),
         false, bufferedRecord.getHoodieOperation(), 
bufferedRecord.getOrderingValue(), bufferedRecord.isDelete());
   }
 
@@ -223,7 +222,7 @@ public abstract class BaseSparkInternalRecordContext 
extends RecordContext<Inter
   @Override
   public UnaryOperator<InternalRow> projectRecord(HoodieSchema from, 
HoodieSchema to, Map<String, String> renamedColumns) {
     Function1<InternalRow, UnsafeRow> unsafeRowWriter =
-        
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from.toAvroSchema()),
 getCachedSchema(to.toAvroSchema()), renamedColumns, Collections.emptyMap());
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from), 
getCachedSchema(to), renamedColumns, Collections.emptyMap());
     return row -> (InternalRow) unsafeRowWriter.apply(row);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
index a7b3dd9a7421..e4a34ec60acf 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/BaseSparkInternalRowReaderContext.java
@@ -90,7 +90,7 @@ public abstract class BaseSparkInternalRowReaderContext 
extends HoodieReaderCont
     Map<Integer, Object> partitionValuesByIndex = 
partitionFieldAndValues.stream()
         .collect(Collectors.toMap(pair -> 
to.getField(pair.getKey()).orElseThrow(() -> new 
IllegalArgumentException("Missing field: " + pair.getKey())).pos(), 
Pair::getRight));
     Function1<InternalRow, UnsafeRow> unsafeRowWriter =
-        
HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from.toAvroSchema()),
 getCachedSchema(to.toAvroSchema()), Collections.emptyMap(), 
partitionValuesByIndex);
+        HoodieInternalRowUtils.getCachedUnsafeRowWriter(getCachedSchema(from), 
getCachedSchema(to), Collections.emptyMap(), partitionValuesByIndex);
     return row -> (InternalRow) unsafeRowWriter.apply(row);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
index 24a37582399d..0ee2abd847a5 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala
@@ -24,6 +24,7 @@ import org.apache.hudi.common.config.TypedProperties
 import org.apache.hudi.common.data.HoodieData
 import org.apache.hudi.common.engine.TaskContextSupplier
 import org.apache.hudi.common.model.HoodieRecord
+import org.apache.hudi.common.schema.HoodieSchema
 import org.apache.hudi.common.table.HoodieTableConfig
 import org.apache.hudi.common.util.{OrderingValues, ReflectionUtils}
 import org.apache.hudi.config.HoodieWriteConfig
@@ -36,13 +37,11 @@ import 
org.apache.hudi.keygen.factory.HoodieSparkKeyGeneratorFactory
 import org.apache.hudi.table.{BulkInsertPartitioner, HoodieTable}
 import 
org.apache.hudi.table.action.commit.{BucketBulkInsertDataInternalWriterHelper, 
BulkInsertDataInternalWriterHelper, 
ConsistentBucketBulkInsertDataInternalWriterHelper, ParallelismHelper}
 import org.apache.hudi.util.JFunction.toJavaSerializableFunctionUnchecked
-
-import org.apache.avro.Schema
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{DataFrame, Dataset, Row}
-import org.apache.spark.sql.HoodieUnsafeRowUtils.{composeNestedFieldPath, 
getNestedInternalRowValue, NestedFieldPath}
+import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, 
composeNestedFieldPath, getNestedInternalRowValue}
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.Project
@@ -157,7 +156,7 @@ object HoodieDatasetBulkInsertHelper
                  writeConfig: HoodieWriteConfig,
                  arePartitionRecordsSorted: Boolean,
                  shouldPreserveHoodieMetadata: Boolean): 
HoodieData[WriteStatus] = {
-    val schema = AvroConversionUtils.alignFieldsNullability(dataset.schema, 
new Schema.Parser().parse(writeConfig.getSchema))
+    val schema = 
HoodieSchemaConversionUtils.alignFieldsNullability(dataset.schema, 
HoodieSchema.parse(writeConfig.getSchema))
     HoodieJavaRDD.of(
       injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
         val taskContextSupplier: TaskContextSupplier = 
table.getTaskContextSupplier
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
index 153aca96a154..daf5a0a9417f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala
@@ -29,7 +29,6 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.util.ExceptionWrappingIterator
 
-import org.apache.avro.Schema
 import org.apache.avro.generic.GenericRecord
 import org.apache.hadoop.fs.Path
 import org.apache.spark.SPARK_VERSION
@@ -202,9 +201,9 @@ object HoodieSparkUtils extends SparkAdapterSupport with 
SparkVersionsSupport wi
       if (recs.isEmpty) {
         Iterator.empty
       } else {
-        val schema = new Schema.Parser().parse(serializedTargetSchema)
+        val schema = HoodieSchema.parse(serializedTargetSchema)
         val transform: GenericRecord => Either[GenericRecord, String] = record 
=> try {
-          Left(HoodieAvroUtils.rewriteRecordDeep(record, schema, true))
+          Left(HoodieAvroUtils.rewriteRecordDeep(record, schema.toAvroSchema, 
true))
         } catch {
           case _: Throwable => 
Right(HoodieAvroUtils.safeAvroToJsonString(record))
         }
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala
index eac6bf6d0239..2d8a7276e681 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRecordContext.scala
@@ -19,7 +19,6 @@
 
 package org.apache.hudi
 
-import org.apache.avro.Schema
 import org.apache.avro.generic.{GenericRecord, IndexedRecord}
 import org.apache.hudi.avro.AvroSchemaUtils.isNullable
 import org.apache.hudi.common.engine.RecordContext
@@ -36,8 +35,8 @@ import scala.collection.mutable
 trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContext {
 
   lazy val sparkAdapter: SparkAdapter = SparkAdapterSupport.sparkAdapter
-  private val deserializerMap: mutable.Map[Schema, HoodieAvroDeserializer] = 
mutable.Map()
-  private val serializerMap: mutable.Map[Schema, HoodieAvroSerializer] = 
mutable.Map()
+  private val deserializerMap: mutable.Map[HoodieSchema, 
HoodieAvroDeserializer] = mutable.Map()
+  private val serializerMap: mutable.Map[HoodieSchema, HoodieAvroSerializer] = 
mutable.Map()
 
   override def supportsParquetRowIndex: Boolean = {
     HoodieSparkUtils.gteqSpark3_5
@@ -50,17 +49,17 @@ trait SparkFileFormatInternalRecordContext extends 
BaseSparkInternalRecordContex
    * @return An [[InternalRow]].
    */
   override def convertAvroRecord(avroRecord: IndexedRecord): InternalRow = {
-    val schema = avroRecord.getSchema
+    val schema = HoodieSchema.fromAvroSchema(avroRecord.getSchema)
     val structType = HoodieInternalRowUtils.getCachedSchema(schema)
     val deserializer = deserializerMap.getOrElseUpdate(schema, {
-      sparkAdapter.createAvroDeserializer(HoodieSchema.fromAvroSchema(schema), 
structType)
+      sparkAdapter.createAvroDeserializer(schema, structType)
     })
     deserializer.deserialize(avroRecord).get.asInstanceOf[InternalRow]
   }
 
   override def convertToAvroRecord(record: InternalRow, schema: HoodieSchema): 
GenericRecord = {
-    val structType = 
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema)
-    val serializer = serializerMap.getOrElseUpdate(schema.toAvroSchema, {
+    val structType = HoodieInternalRowUtils.getCachedSchema(schema)
+    val serializer = serializerMap.getOrElseUpdate(schema, {
       sparkAdapter.createAvroSerializer(structType, schema, schema.isNullable)
     })
     serializer.serialize(record).asInstanceOf[GenericRecord]
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
index 96092fe22be8..5a0f796d377b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/SparkFileFormatInternalRowReaderContext.scala
@@ -80,7 +80,7 @@ class SparkFileFormatInternalRowReaderContext(baseFileReader: 
SparkColumnarFileR
     if (hasRowIndexField) {
       assert(getRecordContext.supportsParquetRowIndex())
     }
-    val structType = 
HoodieInternalRowUtils.getCachedSchema(requiredSchema.toAvroSchema)
+    val structType = HoodieInternalRowUtils.getCachedSchema(requiredSchema)
     val (readSchema, readFilters) = getSchemaAndFiltersForRead(structType, 
hasRowIndexField)
     if (FSUtils.isLogFile(filePath)) {
       // NOTE: now only primary key based filtering is supported for log files
@@ -290,4 +290,4 @@ object SparkFileFormatInternalRowReaderContext {
     field.name.equals(ROW_INDEX_TEMPORARY_COLUMN_NAME)
   }
 
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
index e2b782a424a8..3b0320d6877d 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala
@@ -18,11 +18,10 @@
 
 package org.apache.spark.sql
 
-import org.apache.avro.Schema
-import org.apache.hudi.AvroConversionUtils.convertAvroSchemaToStructType
 import org.apache.hudi.avro.HoodieAvroUtils.{createFullName, toJavaDate}
 import org.apache.hudi.exception.HoodieException
 import org.apache.hudi.common.schema.HoodieSchema
+import 
org.apache.hudi.HoodieSchemaConversionUtils.convertHoodieSchemaToStructType
 import 
org.apache.spark.sql.HoodieCatalystExpressionUtils.generateUnsafeProjection
 import org.apache.spark.sql.HoodieUnsafeRowUtils.{NestedFieldPath, 
composeNestedFieldPath}
 import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, 
UnsafeArrayData, UnsafeProjection, UnsafeRow}
@@ -59,13 +58,13 @@ object HoodieInternalRowUtils {
         new mutable.HashMap[(StructType, StructType), UnsafeProjection]
     })
 
-  private val identicalUnsafeProjectionThreadLocal: 
ThreadLocal[mutable.HashMap[Schema, UnsafeProjection]] =
-    ThreadLocal.withInitial(new Supplier[mutable.HashMap[Schema, 
UnsafeProjection]] {
-      override def get(): mutable.HashMap[Schema, UnsafeProjection] =
-        new mutable.HashMap[Schema, UnsafeProjection]
+  private val identicalUnsafeProjectionThreadLocal: 
ThreadLocal[mutable.HashMap[HoodieSchema, UnsafeProjection]] =
+    ThreadLocal.withInitial(new Supplier[mutable.HashMap[HoodieSchema, 
UnsafeProjection]] {
+      override def get(): mutable.HashMap[HoodieSchema, UnsafeProjection] =
+        new mutable.HashMap[HoodieSchema, UnsafeProjection]
     })
 
-  private val schemaMap = new ConcurrentHashMap[Schema, StructType]
+  private val schemaMap = new ConcurrentHashMap[HoodieSchema, StructType]
   private val orderPosListMap = new ConcurrentHashMap[(StructType, String), 
Option[NestedFieldPath]]
 
   /**
@@ -85,7 +84,7 @@ object HoodieInternalRowUtils {
    */
   def getCachedUnsafeProjection(schema: HoodieSchema): UnsafeProjection = {
     identicalUnsafeProjectionThreadLocal.get()
-      .getOrElseUpdate(schema.getAvroSchema, 
UnsafeProjection.create(getCachedSchema(schema.getAvroSchema)))
+      .getOrElseUpdate(schema, 
UnsafeProjection.create(getCachedSchema(schema)))
   }
 
   /**
@@ -127,16 +126,16 @@ object HoodieInternalRowUtils {
    * @param schema [[Schema]] to convert to [[StructType]], NOTE: It is best 
that the schema passed in is cached through 
[[org.apache.hudi.avro.AvroSchemaCache]], so that we can reduce the overhead of 
schema lookup in the map
    * @return [[StructType]] for provided [[Schema]]
    */
-  def getCachedSchema(schema: Schema): StructType = {
+  def getCachedSchema(schema: HoodieSchema): StructType = {
     val structType = schemaMap.get(schema)
     // NOTE: This specifically designed to do 2 lookups (in case of 
cache-miss) to avoid
     //       allocating the closure when using [[computeIfAbsent]] on more 
frequent cache-hit path
     if (structType != null) {
       structType
     } else {
-      schemaMap.computeIfAbsent(schema, new JFunction[Schema, StructType] {
-        override def apply(t: Schema): StructType =
-          convertAvroSchemaToStructType(schema)
+      schemaMap.computeIfAbsent(schema, new JFunction[HoodieSchema, 
StructType] {
+        override def apply(t: HoodieSchema): StructType =
+          convertHoodieSchemaToStructType(schema)
       })
     }
   }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
index 56a88a96861f..05814e446417 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/callback/TestHoodieClientInitCallback.java
@@ -23,13 +23,13 @@ import 
org.apache.hudi.callback.impl.HoodieWriteCommitHttpCallback;
 import org.apache.hudi.client.BaseHoodieClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.execution.bulkinsert.NonSortPartitionerWithRows;
 import org.apache.hudi.storage.StorageConfiguration;
 
-import org.apache.avro.Schema;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -98,20 +98,20 @@ public class TestHoodieClientInitCallback {
             WRITE_SCHEMA_OVERRIDE.key(), TRIP_NESTED_EXAMPLE_SCHEMA))
         .build(false);
     assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
-    assertFalse(new Schema.Parser().parse(config.getWriteSchema())
+    assertFalse(HoodieSchema.parse(config.getWriteSchema())
         .getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
 
     try (SparkRDDWriteClient<Object> writeClient = new 
SparkRDDWriteClient<>(engineContext, config)) {
 
       HoodieWriteConfig updatedConfig = writeClient.getConfig();
       assertFalse(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
-      Schema actualSchema = new 
Schema.Parser().parse(updatedConfig.getWriteSchema());
+      HoodieSchema actualSchema = 
HoodieSchema.parse(updatedConfig.getWriteSchema());
       
assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
       assertEquals(CUSTOM_CONFIG_VALUE2, 
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
 
       updatedConfig = writeClient.getTableServiceClient().getConfig();
       assertFalse(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
-      actualSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
+      actualSchema = HoodieSchema.parse(updatedConfig.getWriteSchema());
       
assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
       assertEquals(CUSTOM_CONFIG_VALUE2, 
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
     }
@@ -129,7 +129,7 @@ public class TestHoodieClientInitCallback {
             WRITE_SCHEMA_OVERRIDE.key(), TRIP_NESTED_EXAMPLE_SCHEMA))
         .build(false);
     assertFalse(config.contains(CUSTOM_CONFIG_KEY1));
-    assertFalse(new Schema.Parser().parse(config.getWriteSchema())
+    assertFalse(HoodieSchema.parse(config.getWriteSchema())
         .getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
 
     try (SparkRDDWriteClient<Object> writeClient = new 
SparkRDDWriteClient<>(engineContext, config)) {
@@ -137,14 +137,14 @@ public class TestHoodieClientInitCallback {
       HoodieWriteConfig updatedConfig = writeClient.getConfig();
       assertTrue(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
       assertEquals(CUSTOM_CONFIG_VALUE1, 
updatedConfig.getString(CUSTOM_CONFIG_KEY1));
-      Schema actualSchema = new 
Schema.Parser().parse(updatedConfig.getWriteSchema());
+      HoodieSchema actualSchema = 
HoodieSchema.parse(updatedConfig.getWriteSchema());
       
assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
       assertEquals(CUSTOM_CONFIG_VALUE2, 
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
 
       updatedConfig = writeClient.getTableServiceClient().getConfig();
       assertTrue(updatedConfig.contains(CUSTOM_CONFIG_KEY1));
       assertEquals(CUSTOM_CONFIG_VALUE1, 
updatedConfig.getString(CUSTOM_CONFIG_KEY1));
-      actualSchema = new Schema.Parser().parse(updatedConfig.getWriteSchema());
+      actualSchema = HoodieSchema.parse(updatedConfig.getWriteSchema());
       
assertTrue(actualSchema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2));
       assertEquals(CUSTOM_CONFIG_VALUE2, 
actualSchema.getObjectProps().get(CUSTOM_CONFIG_KEY2));
     }
@@ -217,7 +217,7 @@ public class TestHoodieClientInitCallback {
     @Override
     public void call(BaseHoodieClient hoodieClient) {
       HoodieWriteConfig config = hoodieClient.getConfig();
-      Schema schema = new Schema.Parser().parse(config.getWriteSchema());
+      HoodieSchema schema = HoodieSchema.parse(config.getWriteSchema());
       if (!schema.getObjectProps().containsKey(CUSTOM_CONFIG_KEY2)) {
         schema.addProp(CUSTOM_CONFIG_KEY2, CUSTOM_CONFIG_VALUE2);
       }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
index 3593377ce1f8..e97d45fa88cc 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestUpdateSchemaEvolution.java
@@ -41,7 +41,6 @@ import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.testutils.HoodieSparkClientTestHarness;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.Path;
@@ -145,8 +144,8 @@ public class TestUpdateSchemaEvolution extends 
HoodieSparkClientTestHarness impl
   }
 
   private List<HoodieRecord> buildUpdateRecords(String recordStr, String 
insertFileId, String schema) throws IOException {
-    Schema avroSchema = new Schema.Parser().parse(schema);
-    GenericRecord data = new GenericData.Record(avroSchema);
+    HoodieSchema hoodieSchema = HoodieSchema.parse(schema);
+    GenericRecord data = new GenericData.Record(hoodieSchema.getAvroSchema());
     Map<String, Object> json = 
JsonUtils.getObjectMapper().readValue(recordStr, Map.class);
     json.forEach(data::put);
     String key = json.get("_row_key").toString();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
index b348a1c33589..2eb39df74f01 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestExternalPathHandling.java
@@ -34,6 +34,9 @@ import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -53,7 +56,6 @@ import org.apache.hudi.stats.ValueMetadata;
 import org.apache.hudi.table.action.clean.CleanPlanner;
 import org.apache.hudi.testutils.HoodieClientTestBase;
 
-import org.apache.avro.Schema;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -88,10 +90,10 @@ public class TestExternalPathHandling extends 
HoodieClientTestBase {
     metaClient = HoodieTableMetaClient.reload(metaClient);
     Properties properties = new Properties();
     properties.setProperty(HoodieMetadataConfig.AUTO_INITIALIZE.key(), 
"false");
-    List<Schema.Field> fields = new ArrayList<>();
-    fields.add(new Schema.Field(FIELD_1, Schema.create(Schema.Type.STRING), 
null, null));
-    fields.add(new Schema.Field(FIELD_2, Schema.create(Schema.Type.STRING), 
null, null));
-    Schema simpleSchema = Schema.createRecord("simpleSchema", null, null, 
false, fields);
+    List<HoodieSchemaField> fields = new ArrayList<>();
+    fields.add(HoodieSchemaField.of(FIELD_1, 
HoodieSchema.create(HoodieSchemaType.STRING)));
+    fields.add(HoodieSchemaField.of(FIELD_2, 
HoodieSchema.create(HoodieSchemaType.STRING)));
+    HoodieSchema simpleSchema = HoodieSchema.createRecord("simpleSchema", 
null, null, false, fields);
 
     writeConfig = HoodieWriteConfig.newBuilder()
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(INMEMORY).build())
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 62b7a2495557..974e717a211f 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -62,7 +62,6 @@ import org.apache.hudi.table.HoodieSparkTable;
 import org.apache.hudi.table.HoodieTable;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -520,15 +519,15 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> 
logFiles) throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      Schema writerSchema  =
-          TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath());
+      HoodieSchema writerSchema  =
+          
HoodieSchema.fromAvroSchema(TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath()));
       if (writerSchema == null) {
         // not a data block
         continue;
       }
 
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
-          new HoodieLogFile(pathInfoList.get(0).getPath()), 
HoodieSchema.fromAvroSchema(writerSchema))) {
+          new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
           HoodieLogBlock logBlock = logFileReader.next();
           if (logBlock instanceof HoodieDataBlock) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/KeyGeneratorTestUtilities.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/KeyGeneratorTestUtilities.java
index 29ee6af5a7a9..4be9fea94840 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/KeyGeneratorTestUtilities.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/keygen/KeyGeneratorTestUtilities.java
@@ -19,9 +19,9 @@
 
 package org.apache.hudi.keygen;
 
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.util.JavaScalaConverters;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
@@ -101,14 +101,14 @@ class KeyGeneratorTestUtilities {
   }
 
   public static GenericRecord getNestedColRecord(String prop1Value, Long 
prop2Value) {
-    GenericRecord nestedColRecord = new GenericData.Record(new 
Schema.Parser().parse(NESTED_COL_SCHEMA));
+    GenericRecord nestedColRecord = new 
GenericData.Record(HoodieSchema.parse(NESTED_COL_SCHEMA).getAvroSchema());
     nestedColRecord.put("prop1", prop1Value);
     nestedColRecord.put("prop2", prop2Value);
     return nestedColRecord;
   }
 
   public static GenericRecord getRecord(GenericRecord nestedColRecord) {
-    GenericRecord avroRecord = new GenericData.Record(new 
Schema.Parser().parse(EXAMPLE_SCHEMA));
+    GenericRecord avroRecord = new 
GenericData.Record(HoodieSchema.parse(EXAMPLE_SCHEMA).getAvroSchema());
     avroRecord.put("timestamp", 4357686L);
     avroRecord.put("_row_key", "key1");
     avroRecord.put("ts_ms", "2020-03-21");
@@ -197,4 +197,4 @@ class KeyGeneratorTestUtilities {
     }
     return value;
   }
-}
\ No newline at end of file
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index 9a67f723fdbe..ec2228be15c9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -34,6 +34,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.SerializableIndexedRecord;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -63,7 +64,6 @@ import 
org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
 import org.apache.hudi.testutils.providers.SparkProvider;
 import org.apache.hudi.timeline.service.TimelineService;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -424,7 +424,7 @@ public class SparkClientFunctionalTestHarness implements 
SparkProvider, HoodieMe
             .build());
   }
 
-  protected Dataset<Row> toDataset(List<HoodieRecord> records, Schema schema) {
+  protected Dataset<Row> toDataset(List<HoodieRecord> records, HoodieSchema 
schema) {
     List<GenericRecord> avroRecords = records.stream()
         .map(r -> (GenericRecord) ((SerializableIndexedRecord) 
r.getData()).getData())
         .collect(Collectors.toList());
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
index 25151cda0008..2798121af3d2 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieCreateRecordUtils.scala
@@ -167,8 +167,8 @@ object HoodieCreateRecordUtils {
 
       case HoodieRecord.HoodieRecordType.SPARK =>
         val dataFileSchema = 
HoodieSchemaCache.intern(HoodieSchema.parse(dataFileSchemaStr))
-        val dataFileStructType = 
HoodieInternalRowUtils.getCachedSchema(dataFileSchema.getAvroSchema)
-        val writerStructType = 
HoodieInternalRowUtils.getCachedSchema(AvroSchemaCache.intern(writerSchema.getAvroSchema))
+        val dataFileStructType = 
HoodieInternalRowUtils.getCachedSchema(dataFileSchema)
+        val writerStructType = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchemaCache.intern(writerSchema))
         val sourceStructType = df.schema
 
         df.queryExecution.toRdd.mapPartitions { it =>
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
index a9fb6a287639..f17d641edb11 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieSparkSqlWriter.scala
@@ -364,7 +364,7 @@ class HoodieSparkSqlWriterInternal {
       //       Avro's [[Schema]] we're preserving corresponding "record-name" 
and "record-namespace" that
       //       play crucial role in establishing compatibility b/w schemas
       val (avroRecordName, avroRecordNamespace) = latestTableSchemaOpt.map(s =>
-        (s.getName, toScalaOption(s.getNamespace).orNull(null)))
+        (s.getName, toScalaOption(s.getNamespace).orNull))
         .getOrElse(getRecordNameAndNamespace(tblName))
 
       val sourceSchema = convertStructTypeToHoodieSchema(df.schema, 
avroRecordName, avroRecordNamespace)
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
index 8b95dbc27098..7585d9d93e65 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/CDCFileGroupIterator.scala
@@ -561,7 +561,7 @@ class CDCFileGroupIterator(split: HoodieCDCFileGroupSplit,
    */
   private def convertBufferedRecordToJsonString(record: 
BufferedRecord[InternalRow]): UTF8String = {
     internalRowToJsonStringConverterMap.getOrElseUpdate(record.getSchemaId,
-      new 
InternalRowToJsonStringConverter(HoodieInternalRowUtils.getCachedSchema(readerContext.getRecordContext.decodeAvroSchema(record.getSchemaId).toAvroSchema)))
+      new 
InternalRowToJsonStringConverter(HoodieInternalRowUtils.getCachedSchema(readerContext.getRecordContext.decodeAvroSchema(record.getSchemaId))))
       .convert(record.getRecord)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
index 7bf3e2d3195f..a8b83671724b 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestBufferedRecordMerger.java
@@ -931,7 +931,7 @@ class TestBufferedRecordMerger extends 
SparkClientFunctionalTestHarness {
 
     @Override
     public Object getValue(InternalRow record, HoodieSchema schema, String 
fieldName) {
-      return getFieldValueFromInternalRow(record, schema.toAvroSchema(), 
fieldName);
+      return getFieldValueFromInternalRow(record, schema, fieldName);
     }
 
     @Override
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
index eeceb209f0fc..74c547d991b6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/table/read/TestHoodieFileGroupReaderOnSpark.scala
@@ -399,7 +399,7 @@ class TestHoodieFileGroupReaderOnSpark extends 
TestHoodieFileGroupReaderBase[Int
   }
 
   override def assertRecordMatchesSchema(schema: HoodieSchema, record: 
InternalRow): Unit = {
-    val structType = 
HoodieInternalRowUtils.getCachedSchema(schema.toAvroSchema)
+    val structType = HoodieInternalRowUtils.getCachedSchema(schema)
     assertRecordMatchesSchema(structType, record)
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
index e4e499a150c1..743a0734bfd4 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestParquetColumnProjection.scala
@@ -340,7 +340,7 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
       else HoodieTestDataGenerator.HOODIE_SCHEMA
 
     val records = dataGen.generateInserts("001", recordCount)
-    val inputDF: Dataset[Row] = toDataset(records, 
HoodieTestDataGenerator.HOODIE_SCHEMA.getAvroSchema)
+    val inputDF: Dataset[Row] = toDataset(records, 
HoodieTestDataGenerator.HOODIE_SCHEMA)
 
     inputDF.write.format("org.apache.hudi")
       .options(opts)
@@ -372,7 +372,7 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
       val updatedRecords = dataGen.generateUpdates("002", 
recordsToUpdate.asJava)
 
       // Step 2: Update M records out of those (t/h update)
-      val inputDF = toDataset(updatedRecords, 
HoodieTestDataGenerator.AVRO_SCHEMA)
+      val inputDF = toDataset(updatedRecords, 
HoodieTestDataGenerator.HOODIE_SCHEMA)
 
       inputDF.write.format("org.apache.hudi")
         .options(opts)
@@ -402,7 +402,7 @@ class TestParquetColumnProjection extends 
SparkClientFunctionalTestHarness with
       val updatedRecords = dataGen.generateUpdates("%03d".format(i), 
recordsToUpdate.asJava)
 
       // Step 2: Update M records out of those (t/h update)
-      val inputDF = toDataset(updatedRecords, 
HoodieTestDataGenerator.AVRO_SCHEMA)
+      val inputDF = toDataset(updatedRecords, 
HoodieTestDataGenerator.HOODIE_SCHEMA)
 
       val compactScheduleInline = if (inlineCompact) "false" else "true"
       val compactInline = if (inlineCompact) "true" else "false"
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
index 681e2fb6c4e2..b4ef5fe44a42 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/TestHoodieInternalRowUtils.scala
@@ -311,8 +311,8 @@ class TestHoodieInternalRowUtils extends FunSuite with 
Matchers with BeforeAndAf
     val newRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecord, 
newHoodieSchema.toAvroSchema, new HashMap[String, String])
     assert(GenericData.get.validate(newHoodieSchema.toAvroSchema, newRecord))
     // Convert avro to internalRow
-    val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(avroSchema)
-    val newStructTypeSchema = 
HoodieInternalRowUtils.getCachedSchema(newHoodieSchema.toAvroSchema)
+    val structTypeSchema = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(avroSchema))
+    val newStructTypeSchema = 
HoodieInternalRowUtils.getCachedSchema(newHoodieSchema)
     val row = AvroConversionUtils.createAvroToInternalRowConverter(avroSchema, 
structTypeSchema).apply(avroRecord).get
     val newRowExpected = 
AvroConversionUtils.createAvroToInternalRowConverter(newHoodieSchema.toAvroSchema,
 newStructTypeSchema)
       .apply(newRecord).get
@@ -367,8 +367,8 @@ class TestHoodieInternalRowUtils extends FunSuite with 
Matchers with BeforeAndAf
     // test the correctly of rewrite
     assert(GenericData.get.validate(newAvroSchema, newAvroRecord))
     // Convert avro to internalRow
-    val structTypeSchema = HoodieInternalRowUtils.getCachedSchema(schema)
-    val newStructTypeSchema = 
HoodieInternalRowUtils.getCachedSchema(newAvroSchema)
+    val structTypeSchema = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(schema))
+    val newStructTypeSchema = 
HoodieInternalRowUtils.getCachedSchema(HoodieSchema.fromAvroSchema(newAvroSchema))
     val row = AvroConversionUtils.createAvroToInternalRowConverter(schema, 
structTypeSchema).apply(avroRecord).get
     val newRowExpected = 
AvroConversionUtils.createAvroToInternalRowConverter(newAvroSchema, 
newStructTypeSchema).apply(newAvroRecord).get
 

Reply via email to