This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d272d283658f feat(schema): Remove direct usage of Avro schema in 
Flink-client path (#17739)
d272d283658f is described below

commit d272d283658f5dc124f144c52ee7adc02b848db5
Author: Tim Brown <[email protected]>
AuthorDate: Mon Jan 12 22:16:08 2026 -0500

    feat(schema): Remove direct usage of Avro schema in Flink-client path 
(#17739)
---
 .../hudi/execution/HoodieLazyInsertIterable.java   |   2 +-
 .../hudi/client/model/HoodieFlinkRecord.java       |  33 +-
 .../model/PartialUpdateFlinkRecordMerger.java      |   8 +-
 .../hudi/execution/FlinkLazyInsertIterable.java    |   4 +-
 .../row/HoodieRowDataFileWriterFactory.java        |   6 +-
 .../hudi/table/format/FlinkRecordContext.java      |  16 +-
 .../org/apache/hudi/util/AvroSchemaConverter.java  | 399 ---------------------
 .../apache/hudi/util/AvroToRowDataConverters.java  |   2 +-
 .../apache/hudi/util/HoodieSchemaConverter.java    |  25 +-
 ...ueryContexts.java => RowDataQueryContexts.java} |  34 +-
 .../apache/hudi/util/RowDataToAvroConverters.java  |  61 ++--
 .../hudi/merge/TestHoodieFlinkRecordMerger.java    |  12 +-
 .../hudi/util/TestHoodieSchemaConverter.java       | 118 +++---
 .../hudi/source/stats/ColumnStatsSchemas.java      |   5 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  |  10 +-
 .../org/apache/hudi/table/HoodieTableSource.java   |   4 +-
 .../apache/hudi/table/catalog/HoodieCatalog.java   |  16 +-
 .../hudi/table/catalog/HoodieCatalogUtil.java      |   5 +-
 .../hudi/table/catalog/TableOptionProperties.java  |   4 +-
 .../table/format/FlinkRowDataReaderContext.java    |   6 +-
 .../hudi/table/format/InternalSchemaManager.java   |   6 +-
 .../hudi/table/format/cdc/CdcInputFormat.java      |  12 +-
 .../table/format/mor/MergeOnReadInputFormat.java   |   4 +-
 .../table/format/mor/MergeOnReadTableState.java    |  12 +-
 .../apache/hudi/source/TestStreamReadOperator.java |   6 +-
 .../apache/hudi/table/ITTestSchemaEvolution.java   |   6 +-
 .../table/TestHoodieFileGroupReaderOnFlink.java    |  12 +-
 .../apache/hudi/table/TestHoodieTableFactory.java  |   6 +-
 .../apache/hudi/table/format/TestInputFormat.java  |   6 +-
 .../apache/hudi/utils/TestAvroSchemaConverter.java |  94 -----
 .../org/apache/hudi/utils/TestConfigurations.java  |   4 +-
 .../test/java/org/apache/hudi/utils/TestData.java  |  16 +-
 .../utils/TestRecordKeyToRowDataConverter.java     |   6 +-
 .../hudi/utils/TestRowDataToAvroConverters.java    |   6 +-
 34 files changed, 248 insertions(+), 718 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
index 9655fdca5b13..8487a1f01ee2 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/execution/HoodieLazyInsertIterable.java
@@ -97,7 +97,7 @@ public abstract class HoodieLazyInsertIterable<T>
   }
 
   public static <T> Function<HoodieRecord<T>, 
HoodieInsertValueGenResult<HoodieRecord>> getTransformerInternal(HoodieSchema 
schema,
-                                                                               
                                 HoodieWriteConfig writeConfig) {
+                                                                               
                                HoodieWriteConfig writeConfig) {
     // NOTE: Whether record have to be cloned here is determined based on the 
executor type used
     //       for writing: executors relying on an inner queue, will be keeping 
references to the records
     //       and therefore in the environments where underlying buffer holding 
the record could be
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
index 5d7917fc25a9..7f4fdd0e5caf 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/HoodieFlinkRecord.java
@@ -34,14 +34,13 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.keygen.BaseKeyGenerator;
 import org.apache.hudi.table.format.FlinkRecordContext;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
-import org.apache.hudi.util.RowDataAvroQueryContexts.RowDataQueryContext;
+import org.apache.hudi.util.RowDataQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts.RowDataQueryContext;
 import org.apache.hudi.util.RowProjection;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericRowData;
@@ -103,7 +102,7 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
         if (recordSchema.getField(field).isEmpty()) {
           return OrderingValues.getDefault();
         }
-        return (Comparable<?>) getColumnValue(recordSchema.toAvroSchema(), 
field, props);
+        return (Comparable<?>) getColumnValue(recordSchema, field, props);
       });
     }
   }
@@ -141,7 +140,7 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
   @Override
   public String getRecordKey(HoodieSchema recordSchema, String keyFieldName) {
     if (key == null) {
-      String recordKey = 
Objects.toString(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema()).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data));
+      String recordKey = 
Objects.toString(RowDataQueryContexts.fromSchema(recordSchema).getFieldQueryContext(keyFieldName).getFieldGetter().getFieldOrNull(data));
       key = new HoodieKey(recordKey, null);
     }
     return getRecordKey();
@@ -164,9 +163,9 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
     if (fieldValue == null) {
       return null;
     }
-    
+
     HoodieSchemaType schemaType = fieldSchema.getType();
-    
+
     if (schemaType == HoodieSchemaType.DATE) {
       return LocalDate.ofEpochDay(((Integer) fieldValue).longValue());
     } else if (schemaType == HoodieSchemaType.TIMESTAMP && 
keepConsistentLogicalTimestamp) {
@@ -190,20 +189,20 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
 
   @Override
   public Object getColumnValueAsJava(HoodieSchema recordSchema, String column, 
Properties props) {
-    return getColumnValueAsJava(recordSchema.toAvroSchema(), column, props, 
true);
+    return getColumnValueAsJava(recordSchema, column, props, true);
   }
 
-  private Object getColumnValueAsJava(Schema recordSchema, String column, 
Properties props, boolean allowsNull) {
+  private Object getColumnValueAsJava(HoodieSchema recordSchema, String 
column, Properties props, boolean allowsNull) {
     boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
         HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), 
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
-    RowDataQueryContext rowDataQueryContext = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
+    RowDataQueryContext rowDataQueryContext = 
RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
     return rowDataQueryContext.getFieldQueryContext(column).getValAsJava(data, 
allowsNull);
   }
 
-  private Object getColumnValue(Schema recordSchema, String column, Properties 
props) {
+  private Object getColumnValue(HoodieSchema recordSchema, String column, 
Properties props) {
     boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
         HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), 
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
-    RowDataQueryContext rowDataQueryContext = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, utcTimezone);
+    RowDataQueryContext rowDataQueryContext = 
RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
     return 
rowDataQueryContext.getFieldQueryContext(column).getFieldGetter().getFieldOrNull(data);
   }
 
@@ -236,7 +235,7 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
 
   @Override
   public HoodieRecord rewriteRecordWithNewSchema(HoodieSchema recordSchema, 
Properties props, HoodieSchema newSchema, Map<String, String> renameCols) {
-    RowProjection rowProjection = 
RowDataAvroQueryContexts.getRowProjection(recordSchema.toAvroSchema(), 
newSchema.toAvroSchema(), renameCols);
+    RowProjection rowProjection = 
RowDataQueryContexts.getRowProjection(recordSchema, newSchema, renameCols);
     RowData newRow = rowProjection.project(getData());
     return new HoodieFlinkRecord(getKey(), getOperation(), newRow);
   }
@@ -285,8 +284,8 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
   public Option<HoodieAvroIndexedRecord> toIndexedRecord(HoodieSchema 
recordSchema, Properties props) {
     boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
         HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), 
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
-    RowDataQueryContext rowDataQueryContext = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema(), 
utcTimezone);
-    IndexedRecord indexedRecord = (IndexedRecord) 
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema.toAvroSchema(),
 getData());
+    RowDataQueryContext rowDataQueryContext = 
RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
+    IndexedRecord indexedRecord = (IndexedRecord) 
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema, 
getData());
     return Option.of(new HoodieAvroIndexedRecord(getKey(), indexedRecord, 
getOperation(), getMetadata(), orderingValue, isDelete));
   }
 
@@ -294,8 +293,8 @@ public class HoodieFlinkRecord extends 
HoodieRecord<RowData> {
   public ByteArrayOutputStream getAvroBytes(HoodieSchema recordSchema, 
Properties props) {
     boolean utcTimezone = Boolean.parseBoolean(props.getProperty(
         HoodieStorageConfig.WRITE_UTC_TIMEZONE.key(), 
HoodieStorageConfig.WRITE_UTC_TIMEZONE.defaultValue().toString()));
-    RowDataQueryContext rowDataQueryContext = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema.toAvroSchema(), 
utcTimezone);
-    IndexedRecord indexedRecord = (IndexedRecord) 
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema.toAvroSchema(),
 getData());
+    RowDataQueryContext rowDataQueryContext = 
RowDataQueryContexts.fromSchema(recordSchema, utcTimezone);
+    IndexedRecord indexedRecord = (IndexedRecord) 
rowDataQueryContext.getRowDataToAvroConverter().convert(recordSchema, 
getData());
     return HoodieAvroUtils.avroToBytesStream(indexedRecord);
   }
 }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
index a08c7f39a549..6bae6c2e8eac 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/model/PartialUpdateFlinkRecordMerger.java
@@ -24,7 +24,7 @@ import org.apache.hudi.common.engine.RecordContext;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
 
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -129,7 +129,7 @@ public class PartialUpdateFlinkRecordMerger extends 
HoodieFlinkRecordMerger {
     // later in the file writer.
     int mergedArity = newSchema.getFields().size();
     boolean utcTimezone = 
Boolean.parseBoolean(props.getProperty("read.utc-timezone", "true"));
-    RowData.FieldGetter[] fieldGetters = 
RowDataAvroQueryContexts.fromAvroSchema(newSchema.toAvroSchema(), 
utcTimezone).fieldGetters();
+    RowData.FieldGetter[] fieldGetters = 
RowDataQueryContexts.fromSchema(newSchema, utcTimezone).fieldGetters();
 
     int lowOrderIdx = 0;
     int highOrderIdx = 0;
@@ -138,10 +138,10 @@ public class PartialUpdateFlinkRecordMerger extends 
HoodieFlinkRecordMerger {
     // shift start index for merging if there is schema discrepancy
     if (lowOrderArity != mergedArity) {
       lowOrderIdx += lowOrderArity - mergedArity;
-      lowOrderFieldGetters = 
RowDataAvroQueryContexts.fromAvroSchema(lowOrderSchema.toAvroSchema(), 
utcTimezone).fieldGetters();
+      lowOrderFieldGetters = RowDataQueryContexts.fromSchema(lowOrderSchema, 
utcTimezone).fieldGetters();
     } else if (highOrderArity != mergedArity) {
       highOrderIdx += highOrderArity - mergedArity;
-      highOrderFieldGetters = 
RowDataAvroQueryContexts.fromAvroSchema(highOrderSchema.toAvroSchema(), 
utcTimezone).fieldGetters();
+      highOrderFieldGetters = RowDataQueryContexts.fromSchema(highOrderSchema, 
utcTimezone).fieldGetters();
     }
 
     RowData lowOrderRow = (RowData) lowOrderRecord.getRecord();
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
index dd98d8886e66..dbeb24c550ec 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/execution/FlinkLazyInsertIterable.java
@@ -18,11 +18,11 @@
 
 package org.apache.hudi.execution;
 
-import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.common.schema.HoodieSchemaCache;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaCache;
 import org.apache.hudi.common.util.queue.HoodieExecutor;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
index 39ec2d08eb18..3dbc65c3290e 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataFileWriterFactory.java
@@ -32,7 +32,7 @@ import org.apache.hudi.io.storage.HoodieFileWriterFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.hadoop.HadoopStorageConfiguration;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
 
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.hadoop.conf.Configuration;
@@ -67,7 +67,7 @@ public class HoodieRowDataFileWriterFactory extends 
HoodieFileWriterFactory {
       HoodieConfig config,
       HoodieSchema schema) throws IOException {
     //TODO boundary to revisit in follow up to use HoodieSchema directly
-    final RowType rowType = (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(schema.getAvroSchema()).getRowType().getLogicalType();
+    final RowType rowType = (RowType) 
RowDataQueryContexts.fromSchema(schema).getRowType().getLogicalType();
     HoodieRowDataParquetWriteSupport writeSupport =
         new HoodieRowDataParquetWriteSupport(
             storage.getConf().unwrapAs(Configuration.class), rowType, null);
@@ -94,7 +94,7 @@ public class HoodieRowDataFileWriterFactory extends 
HoodieFileWriterFactory {
       HoodieSchema schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
     //TODO boundary to revisit in follow up to use HoodieSchema directly
-    final RowType rowType = (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(schema.getAvroSchema()).getRowType().getLogicalType();
+    final RowType rowType = (RowType) 
RowDataQueryContexts.fromSchema(schema).getRowType().getLogicalType();
     return newParquetFileWriter(instantTime, storagePath, config, rowType, 
taskContextSupplier);
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
index 927ff5093b86..d242dcbfb9d6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/format/FlinkRecordContext.java
@@ -34,7 +34,7 @@ import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.OrderingValueEngineTypeConverter;
 import org.apache.hudi.util.RecordKeyToRowDataConverter;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
 import org.apache.hudi.util.RowDataUtils;
 import org.apache.hudi.util.RowProjection;
 import org.apache.hudi.util.SchemaEvolvingRowDataProjection;
@@ -82,8 +82,8 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
 
   @Override
   public Object getValue(RowData record, HoodieSchema schema, String 
fieldName) {
-    RowDataAvroQueryContexts.FieldQueryContext fieldQueryContext =
-        RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema(), 
utcTimezone).getFieldQueryContext(fieldName);
+    RowDataQueryContexts.FieldQueryContext fieldQueryContext =
+        RowDataQueryContexts.fromSchema(schema, 
utcTimezone).getFieldQueryContext(fieldName);
     if (fieldQueryContext == null) {
       return null;
     } else {
@@ -108,7 +108,7 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
 
   @Override
   public GenericRecord convertToAvroRecord(RowData record, HoodieSchema 
schema) {
-    return (GenericRecord) 
RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema()).getRowDataToAvroConverter().convert(schema.toAvroSchema(),
 record);
+    return (GenericRecord) 
RowDataQueryContexts.fromSchema(schema).getRowDataToAvroConverter().convert(schema,
 record);
   }
 
   @Override
@@ -125,7 +125,7 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
   @Override
   public RowData convertAvroRecord(IndexedRecord avroRecord) {
     Schema recordSchema = avroRecord.getSchema();
-    AvroToRowDataConverters.AvroToRowDataConverter converter = 
RowDataAvroQueryContexts.fromAvroSchema(recordSchema, 
utcTimezone).getAvroToRowDataConverter();
+    AvroToRowDataConverters.AvroToRowDataConverter converter = 
RowDataQueryContexts.fromSchema(HoodieSchema.fromAvroSchema(recordSchema), 
utcTimezone).getAvroToRowDataConverter();
     RowData rowData = (RowData) converter.convert(avroRecord);
     Schema.Field operationField = 
recordSchema.getField(HoodieRecord.OPERATION_METADATA_FIELD);
     if (operationField != null) {
@@ -180,7 +180,7 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
     if (record instanceof BinaryRowData) {
       return record;
     }
-    RowDataSerializer rowDataSerializer = 
RowDataAvroQueryContexts.getRowDataSerializer(schema.toAvroSchema());
+    RowDataSerializer rowDataSerializer = 
RowDataQueryContexts.getRowDataSerializer(schema);
     return rowDataSerializer.toBinaryRow(record);
   }
 
@@ -197,8 +197,8 @@ public class FlinkRecordContext extends 
RecordContext<RowData> {
    */
   @Override
   public UnaryOperator<RowData> projectRecord(HoodieSchema from, HoodieSchema 
to, Map<String, String> renamedColumns) {
-    RowType fromType = (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(from.toAvroSchema()).getRowType().getLogicalType();
-    RowType toType =  (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(to.toAvroSchema()).getRowType().getLogicalType();
+    RowType fromType = (RowType) 
RowDataQueryContexts.fromSchema(from).getRowType().getLogicalType();
+    RowType toType =  (RowType) 
RowDataQueryContexts.fromSchema(to).getRowType().getLogicalType();
     RowProjection rowProjection = 
SchemaEvolvingRowDataProjection.instance(fromType, toType, renamedColumns);
     return rowProjection::project;
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
deleted file mode 100644
index 1729044a1464..000000000000
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroSchemaConverter.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.util;
-
-import org.apache.hudi.common.util.ReflectionUtils;
-
-import org.apache.avro.LogicalTypes;
-import org.apache.avro.Schema;
-import org.apache.avro.SchemaBuilder;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeinfo.Types;
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.types.DataType;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.DecimalType;
-import org.apache.flink.table.types.logical.IntType;
-import org.apache.flink.table.types.logical.LocalZonedTimestampType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.flink.table.types.logical.LogicalTypeFamily;
-import org.apache.flink.table.types.logical.MapType;
-import org.apache.flink.table.types.logical.MultisetType;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.flink.table.types.logical.TimeType;
-import org.apache.flink.table.types.logical.TimestampType;
-
-import java.util.List;
-import java.util.stream.Collectors;
-
-/**
- * Converts an Avro schema into Flink's type information. It uses {@link 
org.apache.flink.api.java.typeutils.RowTypeInfo} for
- * representing objects and converts Avro types into types that are compatible 
with Flink's Table &
- * SQL API.
- *
- * <p>Note: Changes in this class need to be kept in sync with the 
corresponding runtime classes
- * {@code org.apache.flink.formats.avro.AvroRowDeserializationSchema} and 
{@code org.apache.flink.formats.avro.AvroRowSerializationSchema}.
- *
- * <p>NOTE: reference from Flink release 1.12.0, should remove when Flink 
version upgrade to that.
- */
-public class AvroSchemaConverter {
-
-  /**
-   * Converts an Avro schema {@code schema} into a nested row structure with 
deterministic field order and
-   * data types that are compatible with Flink's Table & SQL API.
-   *
-   * @param schema Avro schema definition
-   * @return data type matching the schema
-   */
-  public static DataType convertToDataType(Schema schema) {
-    switch (schema.getType()) {
-      case RECORD:
-        final List<Schema.Field> schemaFields = schema.getFields();
-
-        final DataTypes.Field[] fields = new 
DataTypes.Field[schemaFields.size()];
-        for (int i = 0; i < schemaFields.size(); i++) {
-          final Schema.Field field = schemaFields.get(i);
-          fields[i] = DataTypes.FIELD(field.name(), 
convertToDataType(field.schema()));
-        }
-        return DataTypes.ROW(fields).notNull();
-      case ENUM:
-      case STRING:
-        // convert Avro's Utf8/CharSequence to String
-        return DataTypes.STRING().notNull();
-      case ARRAY:
-        return 
DataTypes.ARRAY(convertToDataType(schema.getElementType())).notNull();
-      case MAP:
-        return DataTypes.MAP(
-                DataTypes.STRING().notNull(),
-                convertToDataType(schema.getValueType()))
-            .notNull();
-      case UNION:
-        final Schema actualSchema;
-        final boolean nullable;
-        if (schema.getTypes().size() == 2
-            && schema.getTypes().get(0).getType() == Schema.Type.NULL) {
-          actualSchema = schema.getTypes().get(1);
-          nullable = true;
-        } else if (schema.getTypes().size() == 2
-            && schema.getTypes().get(1).getType() == Schema.Type.NULL) {
-          actualSchema = schema.getTypes().get(0);
-          nullable = true;
-        } else if (schema.getTypes().size() == 1) {
-          actualSchema = schema.getTypes().get(0);
-          nullable = false;
-        } else {
-          List<Schema> nonNullTypes = schema.getTypes().stream()
-              .filter(s -> s.getType() != Schema.Type.NULL)
-              .collect(Collectors.toList());
-          nullable = schema.getTypes().size() > nonNullTypes.size();
-
-          // use Kryo for serialization
-          DataType rawDataType = (DataType) ReflectionUtils.invokeStaticMethod(
-              "org.apache.hudi.utils.DataTypeUtils",
-              "createAtomicRawType",
-              new Object[] {false, Types.GENERIC(Object.class)},
-              Boolean.class,
-              TypeInformation.class);
-
-          if (recordTypesOfSameNumFields(nonNullTypes)) {
-            DataType converted = DataTypes.ROW(
-                    DataTypes.FIELD("wrapper", rawDataType))
-                .notNull();
-            return nullable ? converted.nullable() : converted;
-          }
-          // use Kryo for serialization
-          return nullable ? rawDataType.nullable() : rawDataType;
-        }
-        DataType converted = convertToDataType(actualSchema);
-        return nullable ? converted.nullable() : converted;
-      case FIXED:
-        // logical decimal type
-        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
-          final LogicalTypes.Decimal decimalType =
-              (LogicalTypes.Decimal) schema.getLogicalType();
-          return DataTypes.DECIMAL(decimalType.getPrecision(), 
decimalType.getScale())
-              .notNull();
-        }
-        // convert fixed size binary data to primitive byte arrays
-        return DataTypes.VARBINARY(schema.getFixedSize()).notNull();
-      case BYTES:
-        // logical decimal type
-        if (schema.getLogicalType() instanceof LogicalTypes.Decimal) {
-          final LogicalTypes.Decimal decimalType =
-              (LogicalTypes.Decimal) schema.getLogicalType();
-          return DataTypes.DECIMAL(decimalType.getPrecision(), 
decimalType.getScale())
-              .notNull();
-        }
-        return DataTypes.BYTES().notNull();
-      case INT:
-        // logical date and time type
-        final org.apache.avro.LogicalType logicalType = 
schema.getLogicalType();
-        if (logicalType == LogicalTypes.date()) {
-          return DataTypes.DATE().notNull();
-        } else if (logicalType == LogicalTypes.timeMillis()) {
-          return DataTypes.TIME(3).notNull();
-        }
-        return DataTypes.INT().notNull();
-      case LONG:
-        // logical timestamp type
-        if (schema.getLogicalType() == LogicalTypes.timestampMillis()) {
-          return DataTypes.TIMESTAMP(3).notNull();
-        } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMillis()) {
-          return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull();
-        } else if (schema.getLogicalType() == LogicalTypes.timestampMicros()) {
-          return DataTypes.TIMESTAMP(6).notNull();
-        } else if (schema.getLogicalType() == 
LogicalTypes.localTimestampMicros()) {
-          return DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6).notNull();
-        } else if (schema.getLogicalType() == LogicalTypes.timeMillis()) {
-          return DataTypes.TIME(3).notNull();
-        } else if (schema.getLogicalType() == LogicalTypes.timeMicros()) {
-          return DataTypes.TIME(6).notNull();
-        }
-        return DataTypes.BIGINT().notNull();
-      case FLOAT:
-        return DataTypes.FLOAT().notNull();
-      case DOUBLE:
-        return DataTypes.DOUBLE().notNull();
-      case BOOLEAN:
-        return DataTypes.BOOLEAN().notNull();
-      case NULL:
-        return DataTypes.NULL();
-      default:
-        throw new IllegalArgumentException("Unsupported Avro type '" + 
schema.getType() + "'.");
-    }
-  }
-
-  /**
-   * Returns true if all the types are RECORD type with same number of fields.
-   */
-  private static boolean recordTypesOfSameNumFields(List<Schema> types) {
-    if (types == null || types.size() == 0) {
-      return false;
-    }
-    if (types.stream().anyMatch(s -> s.getType() != Schema.Type.RECORD)) {
-      return false;
-    }
-    int numFields = types.get(0).getFields().size();
-    return types.stream().allMatch(s -> s.getFields().size() == numFields);
-  }
-
-  /**
-   * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
-   *
-   * <p>Use "record" as the type name.
-   *
-   * @param schema the schema type, usually it should be the top level record 
type, e.g. not a
-   *               nested type
-   * @return Avro's {@link Schema} matching this logical type.
-   */
-  public static Schema convertToSchema(LogicalType schema) {
-    return convertToSchema(schema, "record");
-  }
-
-  /**
-   * Converts Flink SQL {@link LogicalType} (can be nested) into an Avro 
schema.
-   *
-   * <p>The "{rowName}." is used as the nested row type name prefix in order 
to generate the right
-   * schema. Nested record type that only differs with type name is still 
compatible.
-   *
-   * @param logicalType logical type
-   * @param rowName     the record name
-   * @return Avro's {@link Schema} matching this logical type.
-   */
-  public static Schema convertToSchema(LogicalType logicalType, String 
rowName) {
-    int precision;
-    boolean nullable = logicalType.isNullable();
-    switch (logicalType.getTypeRoot()) {
-      case NULL:
-        return SchemaBuilder.builder().nullType();
-      case BOOLEAN:
-        Schema bool = SchemaBuilder.builder().booleanType();
-        return nullable ? nullableSchema(bool) : bool;
-      case TINYINT:
-      case SMALLINT:
-      case INTEGER:
-        Schema integer = SchemaBuilder.builder().intType();
-        return nullable ? nullableSchema(integer) : integer;
-      case BIGINT:
-        Schema bigint = SchemaBuilder.builder().longType();
-        return nullable ? nullableSchema(bigint) : bigint;
-      case FLOAT:
-        Schema f = SchemaBuilder.builder().floatType();
-        return nullable ? nullableSchema(f) : f;
-      case DOUBLE:
-        Schema d = SchemaBuilder.builder().doubleType();
-        return nullable ? nullableSchema(d) : d;
-      case CHAR:
-      case VARCHAR:
-        Schema str = SchemaBuilder.builder().stringType();
-        return nullable ? nullableSchema(str) : str;
-      case BINARY:
-      case VARBINARY:
-        Schema binary = SchemaBuilder.builder().bytesType();
-        return nullable ? nullableSchema(binary) : binary;
-      case TIMESTAMP_WITHOUT_TIME_ZONE:
-        // use long to represents Timestamp
-        final TimestampType timestampType = (TimestampType) logicalType;
-        precision = timestampType.getPrecision();
-        org.apache.avro.LogicalType timestampLogicalType;
-        if (precision <= 3) {
-          timestampLogicalType = LogicalTypes.timestampMillis();
-        } else if (precision <= 6) {
-          timestampLogicalType = LogicalTypes.timestampMicros();
-        } else {
-          throw new IllegalArgumentException(
-              "Avro does not support TIMESTAMP type with precision: "
-                  + precision
-                  + ", it only support precisions <= 6.");
-        }
-        Schema timestamp = 
timestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
-        return nullable ? nullableSchema(timestamp) : timestamp;
-      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        // use long to represents LocalZonedTimestampType
-        final LocalZonedTimestampType localZonedTimestampType = 
(LocalZonedTimestampType) logicalType;
-        precision = localZonedTimestampType.getPrecision();
-        org.apache.avro.LogicalType localZonedTimestampLogicalType;
-        if (precision <= 3) {
-          localZonedTimestampLogicalType = LogicalTypes.localTimestampMillis();
-        } else if (precision <= 6) {
-          localZonedTimestampLogicalType = LogicalTypes.localTimestampMicros();
-        } else {
-          throw new IllegalArgumentException(
-              "Avro does not support LOCAL TIMESTAMP type with precision: "
-                  + precision
-                  + ", it only support precisions <= 6.");
-        }
-        Schema localZonedTimestamp = 
localZonedTimestampLogicalType.addToSchema(SchemaBuilder.builder().longType());
-        return nullable ? nullableSchema(localZonedTimestamp) : 
localZonedTimestamp;
-      case DATE:
-        // use int to represents Date
-        Schema date = 
LogicalTypes.date().addToSchema(SchemaBuilder.builder().intType());
-        return nullable ? nullableSchema(date) : date;
-      case TIME_WITHOUT_TIME_ZONE:
-        precision = ((TimeType) logicalType).getPrecision();
-        if (precision > 3) {
-          throw new IllegalArgumentException(
-              "Avro does not support TIME type with precision: "
-                  + precision
-                  + ", it only supports precision less than 3.");
-        }
-        // use int to represents Time, we only support millisecond when 
deserialization
-        Schema time =
-            
LogicalTypes.timeMillis().addToSchema(SchemaBuilder.builder().intType());
-        return nullable ? nullableSchema(time) : time;
-      case DECIMAL:
-        DecimalType decimalType = (DecimalType) logicalType;
-        // store BigDecimal as Fixed
-        // for spark compatibility.
-        Schema decimal =
-            LogicalTypes.decimal(decimalType.getPrecision(), 
decimalType.getScale())
-                .addToSchema(SchemaBuilder
-                    .fixed(String.format("%s.fixed", rowName))
-                    
.size(computeMinBytesForDecimalPrecision(decimalType.getPrecision())));
-        return nullable ? nullableSchema(decimal) : decimal;
-      case ROW:
-        RowType rowType = (RowType) logicalType;
-        List<String> fieldNames = rowType.getFieldNames();
-        // we have to make sure the record name is different in a Schema
-        SchemaBuilder.FieldAssembler<Schema> builder =
-            SchemaBuilder.builder().record(rowName).fields();
-        for (int i = 0; i < rowType.getFieldCount(); i++) {
-          String fieldName = fieldNames.get(i);
-          LogicalType fieldType = rowType.getTypeAt(i);
-          SchemaBuilder.GenericDefault<Schema> fieldBuilder =
-              builder.name(fieldName)
-                  .type(convertToSchema(fieldType, rowName + "." + fieldName));
-
-          if (fieldType.isNullable()) {
-            builder = fieldBuilder.withDefault(null);
-          } else {
-            builder = fieldBuilder.noDefault();
-          }
-        }
-        Schema record = builder.endRecord();
-        return nullable ? nullableSchema(record) : record;
-      case MULTISET:
-      case MAP:
-        Schema map =
-            SchemaBuilder.builder()
-                .map()
-                .values(
-                    convertToSchema(
-                        extractValueTypeToAvroMap(logicalType), rowName));
-        return nullable ? nullableSchema(map) : map;
-      case ARRAY:
-        ArrayType arrayType = (ArrayType) logicalType;
-        Schema array =
-            SchemaBuilder.builder()
-                .array()
-                .items(convertToSchema(arrayType.getElementType(), rowName));
-        return nullable ? nullableSchema(array) : array;
-      case RAW:
-      default:
-        throw new UnsupportedOperationException(
-            "Unsupported to derive Schema for type: " + logicalType);
-    }
-  }
-
-  public static LogicalType extractValueTypeToAvroMap(LogicalType type) {
-    LogicalType keyType;
-    LogicalType valueType;
-    if (type instanceof MapType) {
-      MapType mapType = (MapType) type;
-      keyType = mapType.getKeyType();
-      valueType = mapType.getValueType();
-    } else {
-      MultisetType multisetType = (MultisetType) type;
-      keyType = multisetType.getElementType();
-      valueType = new IntType();
-    }
-    if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
-      throw new UnsupportedOperationException(
-          "Avro format doesn't support non-string as key type of map. "
-              + "The key type is: "
-              + keyType.asSummaryString());
-    }
-    return valueType;
-  }
-
-  /**
-   * Returns whether the given logical type belongs to the family.
-   */
-  public static boolean isFamily(LogicalType logicalType, LogicalTypeFamily 
family) {
-    return logicalType.getTypeRoot().getFamilies().contains(family);
-  }
-
-  /**
-   * Returns schema with nullable true.
-   */
-  private static Schema nullableSchema(Schema schema) {
-    return schema.isNullable()
-        ? schema
-        : Schema.createUnion(SchemaBuilder.builder().nullType(), schema);
-  }
-
-  private static int computeMinBytesForDecimalPrecision(int precision) {
-    int numBytes = 1;
-    while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
-      numBytes += 1;
-    }
-    return numBytes;
-  }
-}
-
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
index f9107e6df53e..fee24e520382 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/AvroToRowDataConverters.java
@@ -198,7 +198,7 @@ public class AvroToRowDataConverters {
     final AvroToRowDataConverter keyConverter =
         createConverter(DataTypes.STRING().getLogicalType(), utcTimezone);
     final AvroToRowDataConverter valueConverter =
-        
createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type), 
utcTimezone);
+        
createNullableConverter(HoodieSchemaConverter.extractValueTypeToMap(type), 
utcTimezone);
 
     return avroObject -> {
       final Map<?, ?> map = (Map<?, ?>) avroObject;
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
index 14c59d5789a8..5e211c4851a0 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java
@@ -420,7 +420,7 @@ public class HoodieSchemaConverter {
       return convertToDataType(unionTypes.get(0));
     }
 
-    // Complex multi-type unions - use RAW type (matches AvroSchemaConverter 
logic)
+    // Complex multi-type unions - use RAW type
     List<HoodieSchema> nonNullTypes = unionTypes.stream()
         .filter(t -> t.getType() != HoodieSchemaType.NULL)
         .collect(Collectors.toList());
@@ -458,4 +458,25 @@ public class HoodieSchemaConverter {
     int numFields = types.get(0).getFields().size();
     return types.stream().allMatch(s -> s.getFields().size() == numFields);
   }
-}
\ No newline at end of file
+
+  public static LogicalType extractValueTypeToMap(LogicalType type) {
+    LogicalType keyType;
+    LogicalType valueType;
+    if (type instanceof MapType) {
+      MapType mapType = (MapType) type;
+      keyType = mapType.getKeyType();
+      valueType = mapType.getValueType();
+    } else {
+      MultisetType multisetType = (MultisetType) type;
+      keyType = multisetType.getElementType();
+      valueType = new IntType();
+    }
+    if (!isFamily(keyType, LogicalTypeFamily.CHARACTER_STRING)) {
+      throw new UnsupportedOperationException(
+          "Avro format doesn't support non-string as key type of map. "
+              + "The key type is: "
+              + keyType.asSummaryString());
+    }
+    return valueType;
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java
similarity index 76%
rename from 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
rename to 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java
index ad020b41912d..ace597859a3a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataAvroQueryContexts.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataQueryContexts.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.common.util.collection.Triple;
 import org.apache.hudi.exception.HoodieException;
@@ -26,7 +27,6 @@ import 
org.apache.hudi.util.RowDataToAvroConverters.RowDataToAvroConverter;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
-import org.apache.avro.Schema;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
 import org.apache.flink.table.types.DataType;
@@ -41,21 +41,21 @@ import java.util.function.Function;
 /**
  * Maintains auxiliary utilities for row data fields handling.
  */
-public class RowDataAvroQueryContexts {
-  private static final Map<Pair<Schema, Boolean>, RowDataQueryContext> 
QUERY_CONTEXT_MAP = new ConcurrentHashMap<>();
+public class RowDataQueryContexts {
+  private static final Map<Pair<HoodieSchema, Boolean>, RowDataQueryContext> 
QUERY_CONTEXT_MAP = new ConcurrentHashMap<>();
 
   // BinaryRowWriter in RowDataSerializer are reused, and it's not thread-safe.
-  private static final ThreadLocal<Map<Schema, RowDataSerializer>> 
ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new);
+  private static final ThreadLocal<Map<HoodieSchema, RowDataSerializer>> 
ROWDATA_SERIALIZER_CACHE = ThreadLocal.withInitial(HashMap::new);
 
-  private static final Map<Triple<Schema, Schema, Map<String, String>>, 
RowProjection> ROW_PROJECTION_CACHE = new ConcurrentHashMap<>();
+  private static final Map<Triple<HoodieSchema, HoodieSchema, Map<String, 
String>>, RowProjection> ROW_PROJECTION_CACHE = new ConcurrentHashMap<>();
 
-  public static RowDataQueryContext fromAvroSchema(Schema avroSchema) {
-    return fromAvroSchema(avroSchema, true);
+  public static RowDataQueryContext fromSchema(HoodieSchema schema) {
+    return fromSchema(schema, true);
   }
 
-  public static RowDataQueryContext fromAvroSchema(Schema avroSchema, boolean 
utcTimezone) {
-    return QUERY_CONTEXT_MAP.computeIfAbsent(Pair.of(avroSchema, utcTimezone), 
k -> {
-      DataType dataType = AvroSchemaConverter.convertToDataType(avroSchema);
+  public static RowDataQueryContext fromSchema(HoodieSchema schema, boolean 
utcTimezone) {
+    return QUERY_CONTEXT_MAP.computeIfAbsent(Pair.of(schema, utcTimezone), k 
-> {
+      DataType dataType = HoodieSchemaConverter.convertToDataType(schema);
       RowType rowType = (RowType) dataType.getLogicalType();
       RowType.RowField[] rowFields = rowType.getFields().toArray(new 
RowType.RowField[0]);
       RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[rowFields.length];
@@ -72,18 +72,18 @@ public class RowDataAvroQueryContexts {
     });
   }
 
-  public static RowDataSerializer getRowDataSerializer(Schema avroSchema) {
-    return ROWDATA_SERIALIZER_CACHE.get().computeIfAbsent(avroSchema, schema 
-> {
-      RowType rowType = (RowType) 
fromAvroSchema(schema).getRowType().getLogicalType();
+  public static RowDataSerializer getRowDataSerializer(HoodieSchema schema) {
+    return ROWDATA_SERIALIZER_CACHE.get().computeIfAbsent(schema, 
providedSchema -> {
+      RowType rowType = (RowType) 
fromSchema(providedSchema).getRowType().getLogicalType();
       return new RowDataSerializer(rowType);
     });
   }
 
-  public static RowProjection getRowProjection(Schema from, Schema to, 
Map<String, String> renameCols) {
-    Triple<Schema, Schema, Map<String, String>> cacheKey = Triple.of(from, to, 
renameCols);
+  public static RowProjection getRowProjection(HoodieSchema from, HoodieSchema 
to, Map<String, String> renameCols) {
+    Triple<HoodieSchema, HoodieSchema, Map<String, String>> cacheKey = 
Triple.of(from, to, renameCols);
     return ROW_PROJECTION_CACHE.computeIfAbsent(cacheKey, key -> {
-      RowType fromType = (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(from).getRowType().getLogicalType();
-      RowType toType =  (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(to).getRowType().getLogicalType();
+      RowType fromType = (RowType) 
RowDataQueryContexts.fromSchema(from).getRowType().getLogicalType();
+      RowType toType =  (RowType) 
RowDataQueryContexts.fromSchema(to).getRowType().getLogicalType();
       return SchemaEvolvingRowDataProjection.instance(fromType, toType, 
renameCols);
     });
   }
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
index bc86e28248f2..9eb2979e19aa 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/RowDataToAvroConverters.java
@@ -18,8 +18,11 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.common.schema.HoodieSchema;
+import org.apache.hudi.common.schema.HoodieSchemaField;
+import org.apache.hudi.common.schema.HoodieSchemaType;
+
 import org.apache.avro.Conversions;
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
@@ -63,7 +66,7 @@ public class RowDataToAvroConverters {
    */
   @FunctionalInterface
   public interface RowDataToAvroConverter extends Serializable {
-    Object convert(Schema schema, Object object);
+    Object convert(HoodieSchema schema, Object object);
   }
 
   // 
--------------------------------------------------------------------------------
@@ -90,7 +93,7 @@ public class RowDataToAvroConverters {
               private static final long serialVersionUID = 1L;
 
               @Override
-              public Object convert(Schema schema, Object object) {
+              public Object convert(HoodieSchema schema, Object object) {
                 return null;
               }
             };
@@ -101,7 +104,7 @@ public class RowDataToAvroConverters {
               private static final long serialVersionUID = 1L;
 
               @Override
-              public Object convert(Schema schema, Object object) {
+              public Object convert(HoodieSchema schema, Object object) {
                 return ((Byte) object).intValue();
               }
             };
@@ -112,7 +115,7 @@ public class RowDataToAvroConverters {
               private static final long serialVersionUID = 1L;
 
               @Override
-              public Object convert(Schema schema, Object object) {
+              public Object convert(HoodieSchema schema, Object object) {
                 return ((Short) object).intValue();
               }
             };
@@ -131,7 +134,7 @@ public class RowDataToAvroConverters {
               private static final long serialVersionUID = 1L;
 
               @Override
-              public Object convert(Schema schema, Object object) {
+              public Object convert(HoodieSchema schema, Object object) {
                 return object;
               }
             };
@@ -143,7 +146,7 @@ public class RowDataToAvroConverters {
               private static final long serialVersionUID = 1L;
 
               @Override
-              public Object convert(Schema schema, Object object) {
+              public Object convert(HoodieSchema schema, Object object) {
                 return new Utf8(((BinaryStringData) object).toBytes());
               }
             };
@@ -155,7 +158,7 @@ public class RowDataToAvroConverters {
               private static final long serialVersionUID = 1L;
 
               @Override
-              public Object convert(Schema schema, Object object) {
+              public Object convert(HoodieSchema schema, Object object) {
                 return ByteBuffer.wrap((byte[]) object);
               }
             };
@@ -167,7 +170,7 @@ public class RowDataToAvroConverters {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public Object convert(Schema schema, Object object) {
+            public Object convert(HoodieSchema schema, Object object) {
                 return ((TimestampData) object).toInstant().toEpochMilli();
               }
           };
@@ -176,7 +179,7 @@ public class RowDataToAvroConverters {
             private static final long serialVersionUID = 1L;
 
             @Override
-            public Object convert(Schema schema, Object object) {
+            public Object convert(HoodieSchema schema, Object object) {
               Instant instant = ((TimestampData) object).toInstant();
               return 
Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), 
instant.getNano() / 1000);
             }
@@ -193,7 +196,7 @@ public class RowDataToAvroConverters {
                 private static final long serialVersionUID = 1L;
 
                 @Override
-                public Object convert(Schema schema, Object object) {
+                public Object convert(HoodieSchema schema, Object object) {
                   return utcTimezone ? ((TimestampData) 
object).toInstant().toEpochMilli() : ((TimestampData) 
object).toTimestamp().getTime();
                 }
               };
@@ -203,7 +206,7 @@ public class RowDataToAvroConverters {
                 private static final long serialVersionUID = 1L;
 
                 @Override
-                public Object convert(Schema schema, Object object) {
+                public Object convert(HoodieSchema schema, Object object) {
                   Instant instant = utcTimezone ? ((TimestampData) 
object).toInstant() : ((TimestampData) object).toTimestamp().toInstant();
                   return  
Math.addExact(Math.multiplyExact(instant.getEpochSecond(), 1000_000), 
instant.getNano() / 1000);
                 }
@@ -218,9 +221,9 @@ public class RowDataToAvroConverters {
               private static final long serialVersionUID = 1L;
 
               @Override
-              public Object convert(Schema schema, Object object) {
+              public Object convert(HoodieSchema schema, Object object) {
                 BigDecimal javaDecimal = ((DecimalData) object).toBigDecimal();
-                return DECIMAL_CONVERSION.toFixed(javaDecimal, schema, 
schema.getLogicalType());
+                return DECIMAL_CONVERSION.toFixed(javaDecimal, 
schema.toAvroSchema(), schema.toAvroSchema().getLogicalType());
               }
             };
         break;
@@ -244,19 +247,19 @@ public class RowDataToAvroConverters {
       private static final long serialVersionUID = 1L;
 
       @Override
-      public Object convert(Schema schema, Object object) {
+      public Object convert(HoodieSchema schema, Object object) {
         if (object == null) {
           return null;
         }
 
         // get actual schema if it is a nullable schema
-        Schema actualSchema;
-        if (schema.getType() == Schema.Type.UNION) {
-          List<Schema> types = schema.getTypes();
+        HoodieSchema actualSchema;
+        if (schema.getType() == HoodieSchemaType.UNION) {
+          List<HoodieSchema> types = schema.getTypes();
           int size = types.size();
-          if (size == 2 && types.get(1).getType() == Schema.Type.NULL) {
+          if (size == 2 && types.get(1).getType() == HoodieSchemaType.NULL) {
             actualSchema = types.get(0);
-          } else if (size == 2 && types.get(0).getType() == Schema.Type.NULL) {
+          } else if (size == 2 && types.get(0).getType() == 
HoodieSchemaType.NULL) {
             actualSchema = types.get(1);
           } else {
             throw new IllegalArgumentException(
@@ -289,12 +292,12 @@ public class RowDataToAvroConverters {
       private static final long serialVersionUID = 1L;
 
       @Override
-      public Object convert(Schema schema, Object object) {
+      public Object convert(HoodieSchema schema, Object object) {
         final RowData row = (RowData) object;
-        final List<Schema.Field> fields = schema.getFields();
-        final GenericRecord record = new GenericData.Record(schema);
+        final List<HoodieSchemaField> fields = schema.getFields();
+        final GenericRecord record = new 
GenericData.Record(schema.toAvroSchema());
         for (int i = 0; i < length; ++i) {
-          final Schema.Field schemaField = fields.get(i);
+          final HoodieSchemaField schemaField = fields.get(i);
           Object avroObject =
               fieldConverters[i].convert(
                   schemaField.schema(), fieldGetters[i].getFieldOrNull(row));
@@ -314,8 +317,8 @@ public class RowDataToAvroConverters {
       private static final long serialVersionUID = 1L;
 
       @Override
-      public Object convert(Schema schema, Object object) {
-        final Schema elementSchema = schema.getElementType();
+      public Object convert(HoodieSchema schema, Object object) {
+        final HoodieSchema elementSchema = schema.getElementType();
         ArrayData arrayData = (ArrayData) object;
         List<Object> list = new ArrayList<>();
         for (int i = 0; i < arrayData.size(); ++i) {
@@ -329,7 +332,7 @@ public class RowDataToAvroConverters {
   }
 
   private static RowDataToAvroConverter createMapConverter(LogicalType type, 
boolean utcTimezone) {
-    LogicalType valueType = 
AvroSchemaConverter.extractValueTypeToAvroMap(type);
+    LogicalType valueType = HoodieSchemaConverter.extractValueTypeToMap(type);
     final ArrayData.ElementGetter valueGetter = 
ArrayData.createElementGetter(valueType);
     final RowDataToAvroConverter valueConverter = createConverter(valueType, 
utcTimezone);
 
@@ -337,8 +340,8 @@ public class RowDataToAvroConverters {
       private static final long serialVersionUID = 1L;
 
       @Override
-      public Object convert(Schema schema, Object object) {
-        final Schema valueSchema = schema.getValueType();
+      public Object convert(HoodieSchema schema, Object object) {
+        final HoodieSchema valueSchema = schema.getValueType();
         final MapData mapData = (MapData) object;
         final ArrayData keyArray = mapData.keyArray();
         final ArrayData valueArray = mapData.valueArray();
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java
index a6c3f5031aff..e932a077da61 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/merge/TestHoodieFlinkRecordMerger.java
@@ -28,7 +28,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.read.BufferedRecord;
 import org.apache.hudi.common.table.read.BufferedRecords;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
@@ -68,7 +68,7 @@ public class TestHoodieFlinkRecordMerger {
 
   @Test
   void testMergingWithNewRecordAsDelete() throws IOException {
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+    HoodieSchema schema = 
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
     HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
     RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 
1L);
     BufferedRecord<RowData> oldRecord = 
BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 1L, RECORD_KEY, 
false);
@@ -82,7 +82,7 @@ public class TestHoodieFlinkRecordMerger {
 
   @Test
   void testMergingWithOldRecordAsDelete() throws IOException {
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+    HoodieSchema schema = 
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
     HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
     RowData newRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 
1L);
     BufferedRecord<RowData> newRecord = 
BufferedRecords.fromEngineRecord(newRow, schema, recordContext, 1L, RECORD_KEY, 
false);
@@ -96,7 +96,7 @@ public class TestHoodieFlinkRecordMerger {
 
   @Test
   void testMergingWithOldRecordAccepted() throws IOException {
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+    HoodieSchema schema = 
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
     HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
     RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 
3L);
     BufferedRecord<RowData> oldRecord = 
BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 3L, RECORD_KEY, 
false);
@@ -111,7 +111,7 @@ public class TestHoodieFlinkRecordMerger {
 
   @Test
   void testMergingWithNewRecordAccepted() throws IOException {
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+    HoodieSchema schema = 
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
     HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
     RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 
1L);
     BufferedRecord<RowData> oldRecord = 
BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 1L, RECORD_KEY, 
false);
@@ -126,7 +126,7 @@ public class TestHoodieFlinkRecordMerger {
 
   @Test
   void testMergingWithCommitTimeRecordMerger() throws IOException {
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(RECORD_ROWTYPE));
+    HoodieSchema schema = 
HoodieSchemaConverter.convertToSchema(RECORD_ROWTYPE);
     HoodieKey key = new HoodieKey(RECORD_KEY, PARTITION);
     RowData oldRow = createRow(key, "001", "001_01", "file1", 1, "str_val1", 
2L);
     BufferedRecord<RowData> oldRecord = 
BufferedRecords.fromEngineRecord(oldRow, schema, recordContext, 2L, RECORD_KEY, 
false);
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
index c4892c48e696..a6fc063857eb 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/util/TestHoodieSchemaConverter.java
@@ -19,9 +19,11 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.schema.HoodieSchemaField;
 import org.apache.hudi.common.schema.HoodieSchemaType;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
 
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.types.DataType;
@@ -248,43 +250,6 @@ public class TestHoodieSchemaConverter {
     assertEquals(2, addressSchema.getFields().size());
   }
 
-  @Test
-  public void testCompareWithAvroConversion() {
-    // Test that HoodieSchemaConverter produces the same result as
-    // AvroSchemaConverter + HoodieSchema.fromAvroSchema()
-
-    RowType flinkRowType = (RowType) DataTypes.ROW(
-        DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
-        DataTypes.FIELD("name", DataTypes.STRING().nullable()),
-        DataTypes.FIELD("timestamp", DataTypes.TIMESTAMP(3).notNull()),
-        DataTypes.FIELD("decimal_val", DataTypes.DECIMAL(10, 2).notNull())
-    ).notNull().getLogicalType();
-
-    // Method 1: Direct HoodieSchema conversion
-    HoodieSchema directSchema = 
HoodieSchemaConverter.convertToSchema(flinkRowType, "TestRecord");
-
-    // Method 2: Via Avro conversion
-    HoodieSchema viaAvroSchema = HoodieSchema.fromAvroSchema(
-        AvroSchemaConverter.convertToSchema(flinkRowType, "TestRecord"));
-
-    // Both should produce equivalent schemas
-    assertNotNull(directSchema);
-    assertNotNull(viaAvroSchema);
-    assertEquals(HoodieSchemaType.RECORD, directSchema.getType());
-    assertEquals(HoodieSchemaType.RECORD, viaAvroSchema.getType());
-    assertEquals(4, directSchema.getFields().size());
-    assertEquals(4, viaAvroSchema.getFields().size());
-
-    // Verify field types match
-    for (int i = 0; i < 4; i++) {
-      assertEquals(
-          viaAvroSchema.getFields().get(i).schema().getType(),
-          directSchema.getFields().get(i).schema().getType(),
-          "Field " + i + " type mismatch"
-      );
-    }
-  }
-
   @Test
   public void testComplexNestedStructure() {
     LogicalType complexType = DataTypes.ROW(
@@ -317,26 +282,6 @@ public class TestHoodieSchemaConverter {
     assertEquals(2, nestedRecord.getFields().size());
   }
 
-  @Test
-  public void testNativeConversionMatchesAvroPath() {
-    // Verify native conversion produces same result as Avro path
-    RowType originalRowType = (RowType) DataTypes.ROW(
-        DataTypes.FIELD("id", DataTypes.BIGINT().notNull()),
-        DataTypes.FIELD("name", DataTypes.STRING().nullable()),
-        DataTypes.FIELD("age", DataTypes.INT().notNull())
-    ).notNull().getLogicalType();
-
-    HoodieSchema hoodieSchema = 
HoodieSchemaConverter.convertToSchema(originalRowType, "TestRecord");
-
-    // Native conversion
-    DataType nativeResult = 
HoodieSchemaConverter.convertToDataType(hoodieSchema);
-
-    // Avro path (for comparison)
-    DataType avroResult = 
AvroSchemaConverter.convertToDataType(hoodieSchema.getAvroSchema());
-
-    assertEquals(avroResult.getLogicalType(), nativeResult.getLogicalType());
-  }
-
   @Test
   public void testRoundTripConversion() {
     RowType originalRowType = (RowType) DataTypes.ROW(
@@ -528,4 +473,61 @@ public class TestHoodieSchemaConverter {
     DataType dataType = HoodieSchemaConverter.convertToDataType(fixedSchema);
     assertTrue(dataType.getLogicalType() instanceof VarBinaryType);
   }
-}
\ No newline at end of file
+
+  @Test
+  void testUnionSchemaWithMultipleRecordTypes() {
+    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$);
+    DataType dataType = HoodieSchemaConverter.convertToDataType(schema);
+    int pos = 
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
+    final String expected = "ROW<"
+        + "`fileName` STRING, "
+        + "`columnName` STRING, "
+        + "`minValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, "
+        + "`maxValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, "
+        + "`valueCount` BIGINT, "
+        + "`nullCount` BIGINT, "
+        + "`totalSize` BIGINT, "
+        + "`totalUncompressedSize` BIGINT, "
+        + "`isDeleted` BOOLEAN NOT NULL, "
+        + "`isTightBound` BOOLEAN NOT NULL, "
+        + "`valueType` ROW<`typeOrdinal` INT NOT NULL, `additionalInfo` 
STRING>>";
+    assertEquals(expected, dataType.getChildren().get(pos).toString());
+  }
+
+  @Test
+  void testLocalTimestampType() {
+    DataType dataType = DataTypes.ROW(
+        DataTypes.FIELD("f_localtimestamp_millis", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
+        DataTypes.FIELD("f_localtimestamp_micros", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
+    );
+    // convert to avro schema
+    HoodieSchema schema = 
HoodieSchemaConverter.convertToSchema(dataType.getLogicalType());
+    final String expectedSchema = ""
+        + "[ \"null\", {\n"
+        + "  \"type\" : \"record\",\n"
+        + "  \"name\" : \"record\",\n"
+        + "  \"fields\" : [ {\n"
+        + "    \"name\" : \"f_localtimestamp_millis\",\n"
+        + "    \"type\" : [ \"null\", {\n"
+        + "      \"type\" : \"long\",\n"
+        + "      \"logicalType\" : \"local-timestamp-millis\"\n"
+        + "    } ],\n"
+        + "    \"default\" : null\n"
+        + "  }, {\n"
+        + "    \"name\" : \"f_localtimestamp_micros\",\n"
+        + "    \"type\" : [ \"null\", {\n"
+        + "      \"type\" : \"long\",\n"
+        + "      \"logicalType\" : \"local-timestamp-micros\"\n"
+        + "    } ],\n"
+        + "    \"default\" : null\n"
+        + "  } ]\n"
+        + "} ]";
+    assertEquals(expectedSchema, schema.toString(true));
+    // convert it back
+    DataType convertedDataType = 
HoodieSchemaConverter.convertToDataType(schema);
+    final String expectedDataType = "ROW<"
+        + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
+        + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
+    assertEquals(expectedDataType, convertedDataType.toString());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
index 1188c241990c..f4fdf1dbc6e1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/stats/ColumnStatsSchemas.java
@@ -19,8 +19,9 @@
 package org.apache.hudi.source.stats;
 
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.metadata.HoodieMetadataPayload;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.RowType;
@@ -53,7 +54,7 @@ public class ColumnStatsSchemas {
   public static final int ORD_COL_NAME = 5;
 
   private static DataType getMetadataDataType() {
-    return AvroSchemaConverter.convertToDataType(HoodieMetadataRecord.SCHEMA$);
+    return 
HoodieSchemaConverter.convertToDataType(HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$));
   }
 
   private static DataType getColStatsDataType() {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 3ca968dc3dbf..87006a03d630 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -18,8 +18,8 @@
 
 package org.apache.hudi.table;
 
-import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.schema.HoodieSchemaUtils;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieIndexConfig;
@@ -31,8 +31,8 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
 import org.apache.hudi.storage.StoragePath;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.SerializableSchema;
 import org.apache.hudi.util.StreamerUtil;
 
@@ -434,9 +434,9 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
    * @param rowType The specified table row type
    */
   private static void inferAvroSchema(Configuration conf, LogicalType rowType) 
{
-    if (!conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isPresent()
-        && !conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isPresent()) {
-      String inferredSchema = AvroSchemaConverter.convertToSchema(rowType, 
AvroSchemaUtils.getAvroRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME))).toString();
+    if (conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA_PATH).isEmpty()
+        && conf.getOptional(FlinkOptions.SOURCE_AVRO_SCHEMA).isEmpty()) {
+      String inferredSchema = HoodieSchemaConverter.convertToSchema(rowType, 
HoodieSchemaUtils.getRecordQualifiedName(conf.get(FlinkOptions.TABLE_NAME))).toString();
       conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, inferredSchema);
     }
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
index b9b4c2fff34d..b1bba78f0b07 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSource.java
@@ -544,7 +544,7 @@ public class HoodieTableSource implements
     return CdcInputFormat.builder()
         .config(this.conf)
         .tableState(hoodieTableState)
-        // use the explicit fields' data type because the AvroSchemaConverter
+        // use the explicit fields' data type because the HoodieSchemaConverter
         // is not very stable.
         .fieldTypes(rowDataType.getChildren())
         .predicates(this.predicates)
@@ -569,7 +569,7 @@ public class HoodieTableSource implements
     return MergeOnReadInputFormat.builder()
         .config(this.conf)
         .tableState(hoodieTableState)
-        // use the explicit fields' data type because the AvroSchemaConverter
+        // use the explicit fields' data type because the HoodieSchemaConverter
         // is not very stable.
         .fieldTypes(rowDataType.getChildren())
         .predicates(this.predicates)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 10bdaa7bdf79..b7fbe94fb76d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.avro.AvroSchemaUtils;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.util.CollectionUtils;
@@ -29,13 +30,12 @@ import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.CatalogUtils;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
@@ -259,12 +259,12 @@ public class HoodieCatalog extends AbstractCatalog {
 
     final String path = inferTablePath(catalogPathStr, tablePath);
     Map<String, String> options = 
TableOptionProperties.loadFromProperties(path, hadoopConf);
-    final Schema latestSchema = getLatestTableSchema(path);
+    final HoodieSchema latestSchema = getLatestTableSchema(path);
     if (latestSchema != null) {
       List<String> pkColumns = TableOptionProperties.getPkColumns(options);
       // if the table is initialized from spark, the write schema is nullable 
for pk columns.
       DataType tableDataType = DataTypeUtils.ensureColumnsAsNonNullable(
-          AvroSchemaConverter.convertToDataType(latestSchema), pkColumns);
+          HoodieSchemaConverter.convertToDataType(latestSchema), pkColumns);
       org.apache.flink.table.api.Schema.Builder builder = 
org.apache.flink.table.api.Schema.newBuilder()
           .fromRowDataType(tableDataType);
       final String pkConstraintName = 
TableOptionProperties.getPkConstraintName(options);
@@ -316,7 +316,7 @@ public class HoodieCatalog extends AbstractCatalog {
     if (!resolvedSchema.getPrimaryKey().isPresent() && 
!conf.containsKey(RECORD_KEY_FIELD.key())) {
       throw new CatalogException("Primary key definition is missing");
     }
-    final String avroSchema = AvroSchemaConverter.convertToSchema(
+    final String avroSchema = HoodieSchemaConverter.convertToSchema(
         resolvedSchema.toPhysicalRowDataType().getLogicalType(),
         
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
     conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, avroSchema);
@@ -592,11 +592,11 @@ public class HoodieCatalog extends AbstractCatalog {
     throw new UnsupportedOperationException("alterPartitionColumnStatistics is 
not implemented.");
   }
 
-  private @Nullable Schema getLatestTableSchema(String path) {
+  private @Nullable HoodieSchema getLatestTableSchema(String path) {
     if (path != null && StreamerUtil.tableExists(path, hadoopConf)) {
       try {
         HoodieTableMetaClient metaClient = StreamerUtil.createMetaClient(path, 
hadoopConf);
-        return new TableSchemaResolver(metaClient).getTableAvroSchema(false); 
// change log mode is not supported now
+        return new TableSchemaResolver(metaClient).getTableSchema(false); // 
change log mode is not supported now
       } catch (Throwable throwable) {
         log.warn("Failed to resolve the latest table schema.", throwable);
         // ignored
@@ -616,7 +616,7 @@ public class HoodieCatalog extends AbstractCatalog {
   private void refreshTableProperties(ObjectPath tablePath, CatalogBaseTable 
newCatalogTable) {
     Map<String, String> options = newCatalogTable.getOptions();
     ResolvedCatalogTable resolvedTable =  (ResolvedCatalogTable) 
newCatalogTable;
-    final String avroSchema = AvroSchemaConverter.convertToSchema(
+    final String avroSchema = HoodieSchemaConverter.convertToSchema(
         
resolvedTable.getResolvedSchema().toPhysicalRowDataType().getLogicalType(),
         
AvroSchemaUtils.getAvroRecordQualifiedName(tablePath.getObjectName())).toString();
     options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), avroSchema);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
index 3b8aaf48d12b..ab54f3b393e0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalogUtil.java
@@ -22,7 +22,6 @@ import org.apache.hudi.adapter.Utils;
 import org.apache.hudi.client.HoodieFlinkWriteClient;
 import org.apache.hudi.common.model.PartitionBucketIndexHashingConfig;
 import org.apache.hudi.common.model.WriteOperationType;
-import org.apache.hudi.common.schema.HoodieSchema;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.configuration.FlinkOptions;
@@ -31,8 +30,8 @@ import org.apache.hudi.exception.HoodieCatalogException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import org.apache.hudi.internal.schema.Type;
 import org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 
 import lombok.extern.slf4j.Slf4j;
@@ -232,7 +231,7 @@ public class HoodieCatalogUtil {
       HoodieFlinkWriteClient<?> writeClient = createWriteClient(tablePath, 
oldTable, hadoopConf, inferTablePathFunc);
       Pair<InternalSchema, HoodieTableMetaClient> pair = 
writeClient.getInternalSchemaAndMetaClient();
       InternalSchema oldSchema = pair.getLeft();
-      Function<LogicalType, Type> convertFunc = (LogicalType logicalType) -> 
InternalSchemaConverter.convertToField(HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(logicalType)));
+      Function<LogicalType, Type> convertFunc = (LogicalType logicalType) -> 
InternalSchemaConverter.convertToField(HoodieSchemaConverter.convertToSchema(logicalType));
       InternalSchema newSchema = Utils.applyTableChange(oldSchema, 
tableChanges, convertFunc);
       if (!oldSchema.equals(newSchema)) {
         writeClient.setOperationType(WriteOperationType.ALTER_SCHEMA);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index 0c1674247a27..b2f5d428cdaf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -27,8 +27,8 @@ import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.sync.common.util.SparkDataSourceTableUtils;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.flink.table.catalog.CatalogTable;
@@ -203,7 +203,7 @@ public class TableOptionProperties {
       List<String> partitionKeys,
       boolean withOperationField) {
     RowType rowType = 
supplementMetaFields(DataTypeUtils.toRowType(catalogTable.getUnresolvedSchema()),
 withOperationField);
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(rowType));
+    HoodieSchema schema = HoodieSchemaConverter.convertToSchema(rowType);
     String sparkVersion = 
catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
     Map<String, String> sparkTableProperties = 
SparkDataSourceTableUtils.getSparkTableProperties(
         partitionKeys,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
index 8310dbe4f52a..991f099d8981 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FlinkRowDataReaderContext.java
@@ -47,7 +47,7 @@ import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.Lazy;
 import org.apache.hudi.util.RecordKeyToRowDataConverter;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
 
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.utils.JoinedRowData;
@@ -106,7 +106,7 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
         (HoodieRowDataParquetReader) HoodieIOFactory.getIOFactory(storage)
             .getReaderFactory(HoodieRecord.HoodieRecordType.FLINK)
             .getFileReader(tableConfig, filePath, HoodieFileFormat.PARQUET, 
Option.empty());
-    DataType rowType = 
RowDataAvroQueryContexts.fromAvroSchema(dataSchema.toAvroSchema()).getRowType();
+    DataType rowType = 
RowDataQueryContexts.fromSchema(dataSchema).getRowType();
     return rowDataParquetReader.getRowDataIterator(schemaManager, rowType, 
requiredSchema, getSafePredicates(requiredSchema));
   }
 
@@ -203,7 +203,7 @@ public class FlinkRowDataReaderContext extends 
HoodieReaderContext<RowData> {
     // For e.g, if the pk fields are [a, b] but user only select a, then the pk
     // semantics is lost.
     RecordKeyToRowDataConverter recordKeyRowConverter = new 
RecordKeyToRowDataConverter(
-        pkFieldsPos, (RowType) 
RowDataAvroQueryContexts.fromAvroSchema(requiredSchema.toAvroSchema()).getRowType().getLogicalType());
+        pkFieldsPos, (RowType) 
RowDataQueryContexts.fromSchema(requiredSchema).getRowType().getLogicalType());
     ((FlinkRecordContext) 
recordContext).setRecordKeyRowConverter(recordKeyRowConverter);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
index 76d45a390723..93b79750a21e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/InternalSchemaManager.java
@@ -37,7 +37,7 @@ import 
org.apache.hudi.internal.schema.convert.InternalSchemaConverter;
 import org.apache.hudi.internal.schema.utils.InternalSchemaUtils;
 import org.apache.hudi.storage.HoodieStorageUtils;
 import org.apache.hudi.storage.StorageConfiguration;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 
 import lombok.Getter;
 import org.apache.flink.table.types.DataType;
@@ -157,8 +157,8 @@ public class InternalSchemaManager implements Serializable {
     }
     List<Integer> selectedFieldList = 
IntStream.of(selectedFields).boxed().collect(Collectors.toList());
     // mergeSchema is built with useColumnTypeFromFileSchema = true
-    List<DataType> mergeSchemaAsDataTypes = 
AvroSchemaConverter.convertToDataType(
-        InternalSchemaConverter.convert(mergeSchema, 
"tableName").toAvroSchema()).getChildren();
+    List<DataType> mergeSchemaAsDataTypes = 
HoodieSchemaConverter.convertToDataType(
+        InternalSchemaConverter.convert(mergeSchema, 
"tableName")).getChildren();
     DataType[] fileFieldTypes = new DataType[queryFieldTypes.length];
     for (int i = 0; i < queryFieldTypes.length; i++) {
       // position of ChangedType in querySchema
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
index cc9d8dbfa70c..745af94485b1 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/cdc/CdcInputFormat.java
@@ -141,7 +141,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
     try {
       // get full schema iterator.
       final HoodieSchema schema = HoodieSchemaCache.intern(
-          HoodieSchema.parse(tableState.getAvroSchema()));
+          HoodieSchema.parse(tableState.getTableSchema()));
       // before/after images have assumption of snapshot scan, so `emitDelete` 
is set as false
       return getSplitRowIterator(split, schema, schema, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, false);
     } catch (IOException e) {
@@ -181,7 +181,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
         MergeOnReadInputSplit inputSplit = fileSlice2Split(tablePath, 
fileSlice, maxCompactionMemoryInBytes);
         return new RemoveBaseFileIterator(tableState, 
getFileSliceIterator(inputSplit));
       case AS_IS:
-        HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getAvroSchema()));
+        HoodieSchema dataSchema = 
HoodieSchemaUtils.removeMetadataFields(HoodieSchema.parse(tableState.getTableSchema()));
         HoodieSchema cdcSchema = 
HoodieCDCUtils.schemaBySupplementalLoggingMode(mode, dataSchema);
         switch (mode) {
           case DATA_BEFORE_AFTER:
@@ -216,7 +216,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
    */
   private ClosableIterator<HoodieRecord<RowData>> 
getSplitRecordIterator(MergeOnReadInputSplit split) throws IOException {
     final HoodieSchema schema = HoodieSchemaCache.intern(
-        HoodieSchema.parse(tableState.getAvroSchema()));
+        HoodieSchema.parse(tableState.getTableSchema()));
     HoodieFileGroupReader<RowData> fileGroupReader =
         createFileGroupReader(split, schema, schema, 
FlinkOptions.REALTIME_PAYLOAD_COMBINE, true);
     return fileGroupReader.getClosableHoodieRecordIterator();
@@ -363,7 +363,7 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
         MergeOnReadTableState tableState,
         ClosableIterator<HoodieRecord<RowData>> logRecordIterator,
         HoodieTableMetaClient metaClient) throws IOException {
-      this.tableSchema = HoodieSchema.parse(tableState.getAvroSchema());
+      this.tableSchema = HoodieSchema.parse(tableState.getTableSchema());
       this.maxCompactionMemoryInBytes = maxCompactionMemoryInBytes;
       this.imageManager = imageManager;
       this.projection = 
tableState.getRequiredRowType().equals(tableState.getRowType())
@@ -495,8 +495,8 @@ public class CdcInputFormat extends MergeOnReadInputFormat {
         MergeOnReadTableState tableState,
         HoodieSchema cdcSchema,
         HoodieCDCFileSplit fileSplit) {
-      this.requiredSchema = 
HoodieSchema.parse(tableState.getRequiredAvroSchema());
-      this.requiredPos = getRequiredPos(tableState.getAvroSchema(), 
this.requiredSchema);
+      this.requiredSchema = HoodieSchema.parse(tableState.getRequiredSchema());
+      this.requiredPos = getRequiredPos(tableState.getTableSchema(), 
this.requiredSchema);
       this.recordBuilder = new 
GenericRecordBuilder(requiredSchema.getAvroSchema());
       this.avroToRowDataConverter = 
AvroToRowDataConverters.createRowConverter(tableState.getRequiredRowType());
       StoragePath hadoopTablePath = new StoragePath(tablePath);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
index 165cc6a7d57a..1fd6b3491128 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadInputFormat.java
@@ -190,9 +190,9 @@ public class MergeOnReadInputFormat
             + "flink partition Index: " + split.getSplitNumber()
             + "merge type: " + split.getMergeType());
     final HoodieSchema tableSchema = HoodieSchemaCache.intern(
-        HoodieSchema.parse(tableState.getAvroSchema()));
+        HoodieSchema.parse(tableState.getTableSchema()));
     final HoodieSchema requiredSchema = HoodieSchemaCache.intern(
-        HoodieSchema.parse(tableState.getRequiredAvroSchema()));
+        HoodieSchema.parse(tableState.getRequiredSchema()));
     return getSplitRowIterator(split, tableSchema, requiredSchema, mergeType, 
emitDelete);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
index e5e7e99dc24b..868a7f814aba 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/mor/MergeOnReadTableState.java
@@ -36,21 +36,21 @@ public class MergeOnReadTableState implements Serializable {
 
   private final RowType rowType;
   private final RowType requiredRowType;
-  private final String avroSchema;
-  private final String requiredAvroSchema;
+  private final String tableSchema;
+  private final String requiredSchema;
   private final List<MergeOnReadInputSplit> inputSplits;
   private final int operationPos;
 
   public MergeOnReadTableState(
       RowType rowType,
       RowType requiredRowType,
-      String avroSchema,
-      String requiredAvroSchema,
+      String tableSchema,
+      String requiredSchema,
       List<MergeOnReadInputSplit> inputSplits) {
     this.rowType = rowType;
     this.requiredRowType = requiredRowType;
-    this.avroSchema = avroSchema;
-    this.requiredAvroSchema = requiredAvroSchema;
+    this.tableSchema = tableSchema;
+    this.requiredSchema = requiredSchema;
     this.inputSplits = inputSplits;
     this.operationPos = 
rowType.getFieldIndex(HoodieRecord.OPERATION_METADATA_FIELD);
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
index 56082f107bb4..e540cd55b006 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/source/TestStreamReadOperator.java
@@ -26,7 +26,7 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
 import org.apache.hudi.table.format.mor.MergeOnReadTableState;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
 import org.apache.hudi.utils.TestData;
@@ -253,13 +253,13 @@ public class TestStreamReadOperator {
     } catch (Exception e) {
       throw new HoodieException("Get table avro schema error", e);
     }
-    final DataType rowDataType = 
AvroSchemaConverter.convertToDataType(tableSchema.getAvroSchema());
+    final DataType rowDataType = 
HoodieSchemaConverter.convertToDataType(tableSchema);
     final RowType rowType = (RowType) rowDataType.getLogicalType();
     final MergeOnReadTableState hoodieTableState = new MergeOnReadTableState(
         rowType,
         TestConfigurations.ROW_TYPE,
         tableSchema.toString(),
-        
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
+        
HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE).toString(),
         Collections.emptyList());
     MergeOnReadInputFormat inputFormat = MergeOnReadInputFormat.builder()
         .config(conf)
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
index cebc24651ec2..1598b4d7ed89 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestSchemaEvolution.java
@@ -40,8 +40,8 @@ import org.apache.hudi.sink.compact.CompactOperator;
 import org.apache.hudi.sink.compact.CompactionCommitEvent;
 import org.apache.hudi.sink.compact.CompactionCommitSink;
 import org.apache.hudi.sink.compact.CompactionPlanSourceFunction;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.utils.FlinkMiniCluster;
 
 import lombok.extern.slf4j.Slf4j;
@@ -313,7 +313,7 @@ public class ITTestSchemaEvolution {
   private void writeTableWithSchema2(TableOptions tableOptions) throws 
ExecutionException, InterruptedException {
     tableOptions.withOption(
         FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
-        AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_AFTER, 
"hoodie.t1.t1_record"));
+        HoodieSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_AFTER, 
"hoodie.t1.t1_record"));
 
     //language=SQL
     tEnv.executeSql("drop table t1");
@@ -384,7 +384,7 @@ public class ITTestSchemaEvolution {
         KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key(), "partition",
         KeyGeneratorOptions.HIVE_STYLE_PARTITIONING_ENABLE.key(), true,
         FlinkOptions.WRITE_BATCH_SIZE.key(), 0.000001, // each record triggers 
flush
-        FlinkOptions.SOURCE_AVRO_SCHEMA.key(), 
AvroSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_BEFORE),
+        FlinkOptions.SOURCE_AVRO_SCHEMA.key(), 
HoodieSchemaConverter.convertToSchema(ROW_TYPE_EVOLUTION_BEFORE),
         FlinkOptions.READ_TASKS.key(), 1,
         FlinkOptions.WRITE_TASKS.key(), 1,
         FlinkOptions.INDEX_BOOTSTRAP_TASKS.key(), 1,
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
index f7ff64d3dfcb..45a9b8ecab0b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieFileGroupReaderOnFlink.java
@@ -49,7 +49,7 @@ import org.apache.hudi.table.format.FlinkRowDataReaderContext;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.util.AvroToRowDataConverters;
 import org.apache.hudi.util.HoodieSchemaConverter;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
 import org.apache.hudi.utils.TestData;
 
 import org.apache.flink.configuration.Configuration;
@@ -131,7 +131,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
       HoodieSchema recordSchema,
       HoodieReaderContext<RowData> readerContext,
       boolean sortOutput) throws IOException {
-    RowDataSerializer rowDataSerializer = 
RowDataAvroQueryContexts.getRowDataSerializer(recordSchema.toAvroSchema());
+    RowDataSerializer rowDataSerializer = 
RowDataQueryContexts.getRowDataSerializer(recordSchema);
     try (ClosableIterator<RowData> iterator = 
fileGroupReader.getClosableIterator()) {
       while (iterator.hasNext()) {
         RowData rowData = rowDataSerializer.copy(iterator.next());
@@ -148,7 +148,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     HoodieSchema localSchema = getRecordSchema(schemaStr);
     conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, localSchema.toString());
     AvroToRowDataConverters.AvroToRowDataConverter avroConverter =
-        
RowDataAvroQueryContexts.fromAvroSchema(localSchema.getAvroSchema()).getAvroToRowDataConverter();
+        
RowDataQueryContexts.fromSchema(localSchema).getAvroToRowDataConverter();
     List<RowData> rowDataList = recordList.stream().map(record -> {
       try {
         return (RowData) 
avroConverter.convert(record.toIndexedRecord(localSchema, 
CollectionUtils.emptyProps()).get().getData());
@@ -169,7 +169,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
     TestData.assertRowDataEquals(
         Collections.singletonList(actual),
         Collections.singletonList(expected),
-        
RowDataAvroQueryContexts.fromAvroSchema(schema.toAvroSchema()).getRowType());
+        RowDataQueryContexts.fromSchema(schema).getRowType());
   }
 
   @Override
@@ -324,7 +324,7 @@ public class TestHoodieFileGroupReaderOnFlink extends 
TestHoodieFileGroupReaderB
   }
 
   private static HoodieSchema getRecordSchema(String schemaStr) {
-    HoodieSchema recordSchema = new HoodieSchema.Parser().parse(schemaStr);
-    return 
HoodieSchemaConverter.convertToSchema(RowDataAvroQueryContexts.fromAvroSchema(recordSchema.getAvroSchema()).getRowType().getLogicalType());
+    HoodieSchema recordSchema = HoodieSchema.parse(schemaStr);
+    return 
HoodieSchemaConverter.convertToSchema(RowDataQueryContexts.fromSchema(recordSchema).getRowType().getLogicalType());
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index a4a69706e4fe..adce67c00f30 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -28,7 +28,7 @@ import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.keygen.ComplexAvroKeyGenerator;
 import org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator;
 import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.CatalogUtils;
 import org.apache.hudi.utils.SchemaBuilder;
@@ -364,7 +364,7 @@ public class TestHoodieTableFactory {
     final HoodieTableSink tableSink3 =
         (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, 
schema3, ""));
     final Configuration conf3 = tableSink3.getConf();
-    final String expected = 
AvroSchemaConverter.convertToSchema(schema3.toSourceRowDataType().getLogicalType(),
 AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString();
+    final String expected = 
HoodieSchemaConverter.convertToSchema(schema3.toSourceRowDataType().getLogicalType(),
 AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString();
     assertThat(conf3.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(expected));
   }
 
@@ -565,7 +565,7 @@ public class TestHoodieTableFactory {
     final HoodieTableSink tableSink3 =
         (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(MockContext.getInstance(this.conf, 
schema3, ""));
     final Configuration conf3 = tableSink3.getConf();
-    final String expected = 
AvroSchemaConverter.convertToSchema(schema3.toSinkRowDataType().getLogicalType(),
 AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString();
+    final String expected = 
HoodieSchemaConverter.convertToSchema(schema3.toSinkRowDataType().getLogicalType(),
 AvroSchemaUtils.getAvroRecordQualifiedName("t1")).toString();
     assertThat(conf3.get(FlinkOptions.SOURCE_AVRO_SCHEMA), is(expected));
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
index fc4694bd3a6f..8431a8e0490b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/format/TestInputFormat.java
@@ -50,8 +50,8 @@ import org.apache.hudi.table.format.cdc.CdcInputFormat;
 import org.apache.hudi.table.format.cow.CopyOnWriteInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputFormat;
 import org.apache.hudi.table.format.mor.MergeOnReadInputSplit;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.FlinkWriteClients;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.SerializableSchema;
 import org.apache.hudi.util.StreamerUtil;
 import org.apache.hudi.utils.TestConfigurations;
@@ -1207,7 +1207,7 @@ public class TestInputFormat {
   void testReadWithWiderSchema(HoodieTableType tableType) throws Exception {
     Map<String, String> options = new HashMap<>();
     options.put(FlinkOptions.SOURCE_AVRO_SCHEMA.key(),
-        
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString());
+        
HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_WIDER).toString());
     beforeEach(tableType, options);
 
     TestData.writeData(TestData.DATA_SET_INSERT, conf);
@@ -1223,7 +1223,7 @@ public class TestInputFormat {
     conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
     conf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
     conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");
-    conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), 
AvroSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_DECIMAL_ORDERING).toString());
+    conf.setString(FlinkOptions.SOURCE_AVRO_SCHEMA.key(), 
HoodieSchemaConverter.convertToSchema(TestConfigurations.ROW_TYPE_DECIMAL_ORDERING).toString());
     conf.set(FlinkOptions.COMPACTION_ASYNC_ENABLED, false); // by default 
close the async compaction
     StreamerUtil.initTableIfNotExists(conf);
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
deleted file mode 100644
index afe75e061566..000000000000
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestAvroSchemaConverter.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.utils;
-
-import org.apache.hudi.avro.model.HoodieMetadataRecord;
-import org.apache.hudi.common.schema.HoodieSchema;
-import org.apache.hudi.metadata.HoodieMetadataPayload;
-import org.apache.hudi.util.AvroSchemaConverter;
-
-import org.apache.flink.table.api.DataTypes;
-import org.apache.flink.table.types.DataType;
-import org.junit.jupiter.api.Test;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-/**
- * Test cases for {@link org.apache.hudi.util.AvroSchemaConverter}.
- */
-public class TestAvroSchemaConverter {
-
-  @Test
-  void testUnionSchemaWithMultipleRecordTypes() {
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(HoodieMetadataRecord.SCHEMA$);
-    DataType dataType = 
AvroSchemaConverter.convertToDataType(schema.getAvroSchema());
-    int pos = 
HoodieMetadataRecord.SCHEMA$.getField(HoodieMetadataPayload.SCHEMA_FIELD_ID_COLUMN_STATS).pos();
-    final String expected = "ROW<"
-        + "`fileName` STRING, "
-        + "`columnName` STRING, "
-        + "`minValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, "
-        + "`maxValue` ROW<`wrapper` RAW('java.lang.Object', ?) NOT NULL>, "
-        + "`valueCount` BIGINT, "
-        + "`nullCount` BIGINT, "
-        + "`totalSize` BIGINT, "
-        + "`totalUncompressedSize` BIGINT, "
-        + "`isDeleted` BOOLEAN NOT NULL, "
-        + "`isTightBound` BOOLEAN NOT NULL, "
-        + "`valueType` ROW<`typeOrdinal` INT NOT NULL, `additionalInfo` 
STRING>>";
-    assertThat(dataType.getChildren().get(pos).toString(), is(expected));
-  }
-
-  @Test
-  void testLocalTimestampType() {
-    DataType dataType = DataTypes.ROW(
-        DataTypes.FIELD("f_localtimestamp_millis", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3)),
-        DataTypes.FIELD("f_localtimestamp_micros", 
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(6))
-    );
-    // convert to avro schema
-    HoodieSchema schema = 
HoodieSchema.fromAvroSchema(AvroSchemaConverter.convertToSchema(dataType.getLogicalType()));
-    final String expectedSchema = ""
-        + "[ \"null\", {\n"
-        + "  \"type\" : \"record\",\n"
-        + "  \"name\" : \"record\",\n"
-        + "  \"fields\" : [ {\n"
-        + "    \"name\" : \"f_localtimestamp_millis\",\n"
-        + "    \"type\" : [ \"null\", {\n"
-        + "      \"type\" : \"long\",\n"
-        + "      \"logicalType\" : \"local-timestamp-millis\"\n"
-        + "    } ],\n"
-        + "    \"default\" : null\n"
-        + "  }, {\n"
-        + "    \"name\" : \"f_localtimestamp_micros\",\n"
-        + "    \"type\" : [ \"null\", {\n"
-        + "      \"type\" : \"long\",\n"
-        + "      \"logicalType\" : \"local-timestamp-micros\"\n"
-        + "    } ],\n"
-        + "    \"default\" : null\n"
-        + "  } ]\n"
-        + "} ]";
-    assertThat(schema.toString(true), is(expectedSchema));
-    // convert it back
-    DataType convertedDataType = 
AvroSchemaConverter.convertToDataType(schema.getAvroSchema());
-    final String expectedDataType = "ROW<"
-        + "`f_localtimestamp_millis` TIMESTAMP_LTZ(3), "
-        + "`f_localtimestamp_micros` TIMESTAMP_LTZ(6)>";
-    assertThat(convertedDataType.toString(), is(expectedDataType));
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index 2430a1404529..46c90afa7a10 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -21,8 +21,8 @@ package org.apache.hudi.utils;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.streamer.FlinkStreamerConfig;
-import org.apache.hudi.util.AvroSchemaConverter;
 import org.apache.hudi.util.DataTypeUtils;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.utils.factory.CollectSinkTableFactory;
 import org.apache.hudi.utils.factory.ContinuousFileSourceFactory;
 
@@ -374,7 +374,7 @@ public class TestConfigurations {
   public static Configuration getDefaultConf(String tablePath, DataType 
dataType) {
     Configuration conf = new Configuration();
     conf.set(FlinkOptions.PATH, tablePath);
-    conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, 
AvroSchemaConverter.convertToSchema(dataType.getLogicalType()).toString());
+    conf.set(FlinkOptions.SOURCE_AVRO_SCHEMA, 
HoodieSchemaConverter.convertToSchema(dataType.getLogicalType()).toString());
     conf.set(FlinkOptions.TABLE_NAME, "TestHoodieTable");
     conf.set(FlinkOptions.PARTITION_PATH_FIELD, "partition");
     return conf;
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 9020073f2a37..8dfbd1d724a9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -37,18 +37,17 @@ import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.sink.utils.BucketStreamWriteFunctionWrapper;
-import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.sink.utils.BulkInsertFunctionWrapper;
 import org.apache.hudi.sink.utils.ConsistentBucketStreamWriteFunctionWrapper;
 import org.apache.hudi.sink.utils.InsertFunctionWrapper;
+import org.apache.hudi.sink.utils.StreamWriteFunctionWrapper;
 import org.apache.hudi.sink.utils.TestFunctionWrapper;
 import org.apache.hudi.table.HoodieFlinkTable;
 import org.apache.hudi.table.format.FormatUtils;
 import org.apache.hudi.table.format.InternalSchemaManager;
 import org.apache.hudi.util.AvroToRowDataConverters;
-import org.apache.hudi.util.RowDataAvroQueryContexts;
+import org.apache.hudi.util.RowDataQueryContexts;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
@@ -977,8 +976,7 @@ public class TestData {
 
     HoodieTableMetaClient metaClient = createMetaClient(basePath);
     HoodieFlinkTable<?> table = HoodieFlinkTable.create(config, 
HoodieFlinkEngineContext.DEFAULT, metaClient);
-    Schema schema = new TableSchemaResolver(metaClient).getTableAvroSchema();
-    HoodieSchema hoodieSchema = HoodieSchema.fromAvroSchema(schema);
+    HoodieSchema schema = new TableSchemaResolver(metaClient).getTableSchema();
 
     String latestInstant = 
metaClient.getActiveTimeline().filterCompletedInstants()
         .lastInstant().map(HoodieInstant::requestedTime).orElse(null);
@@ -993,7 +991,7 @@ public class TestData {
       List<String> readBuffer = new ArrayList<>();
       List<FileSlice> fileSlices = 
table.getSliceView().getLatestMergedFileSlicesBeforeOrOn(partitionDir.getName(),
 latestInstant).collect(Collectors.toList());
       for (FileSlice fileSlice : fileSlices) {
-        try (ClosableIterator<RowData> rowIterator = 
getRecordIterator(fileSlice, hoodieSchema, metaClient, config)) {
+        try (ClosableIterator<RowData> rowIterator = 
getRecordIterator(fileSlice, schema, metaClient, config)) {
           while (rowIterator.hasNext()) {
             RowData rowData = rowIterator.next();
             readBuffer.add(filterOutVariables(schema, rowData));
@@ -1048,8 +1046,8 @@ public class TestData {
     return String.join(",", fields);
   }
 
-  private static String filterOutVariables(Schema schema, RowData record) {
-    RowDataAvroQueryContexts.RowDataQueryContext queryContext = 
RowDataAvroQueryContexts.fromAvroSchema(schema);
+  private static String filterOutVariables(HoodieSchema schema, RowData 
record) {
+    RowDataQueryContexts.RowDataQueryContext queryContext = 
RowDataQueryContexts.fromSchema(schema);
     List<String> fields = new ArrayList<>();
     fields.add(getFieldValue(queryContext, record, "_hoodie_record_key"));
     fields.add(getFieldValue(queryContext, record, "_hoodie_partition_path"));
@@ -1061,7 +1059,7 @@ public class TestData {
     return String.join(",", fields);
   }
 
-  private static String 
getFieldValue(RowDataAvroQueryContexts.RowDataQueryContext queryContext, 
RowData rowData, String fieldName) {
+  private static String getFieldValue(RowDataQueryContexts.RowDataQueryContext 
queryContext, RowData rowData, String fieldName) {
     return 
String.valueOf(queryContext.getFieldQueryContext(fieldName).getValAsJava(rowData,
 true));
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java
index 9fa2bd3aa3b8..25967f2246ef 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRecordKeyToRowDataConverter.java
@@ -19,9 +19,9 @@
 package org.apache.hudi.utils;
 
 import org.apache.hudi.keygen.KeyGenUtils;
-import org.apache.hudi.util.AvroSchemaConverter;
-import org.apache.hudi.util.RowDataToAvroConverters;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.RecordKeyToRowDataConverter;
+import org.apache.hudi.util.RowDataToAvroConverters;
 
 import org.apache.avro.generic.GenericRecord;
 import org.apache.flink.table.api.DataTypes;
@@ -103,7 +103,7 @@ public class TestRecordKeyToRowDataConverter {
     RowDataToAvroConverters.RowDataToAvroConverter converter =
         RowDataToAvroConverters.createConverter(rowType);
     GenericRecord avroRecord =
-        (GenericRecord) 
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+        (GenericRecord) 
converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData);
     RecordKeyToRowDataConverter keyToRowDataConverter =
         new RecordKeyToRowDataConverter(new int[]{0, 1, 2, 3, 4, 5, 6}, 
rowType);
     final String recordKey = KeyGenUtils.getRecordKey(avroRecord, 
rowType.getFieldNames(), false);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
index 471c0686b60f..185c5830616c 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestRowDataToAvroConverters.java
@@ -18,7 +18,7 @@
 
 package org.apache.hudi.utils;
 
-import org.apache.hudi.util.AvroSchemaConverter;
+import org.apache.hudi.util.HoodieSchemaConverter;
 import org.apache.hudi.util.RowDataToAvroConverters;
 
 import org.apache.avro.generic.GenericRecord;
@@ -58,7 +58,7 @@ class TestRowDataToAvroConverters {
     RowDataToAvroConverters.RowDataToAvroConverter converter =
             RowDataToAvroConverters.createConverter(rowType, false);
     GenericRecord avroRecord =
-            (GenericRecord) 
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+            (GenericRecord) 
converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData);
     Assertions.assertEquals(timestampFromLocal, 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.systemDefault())));
   }
 
@@ -76,7 +76,7 @@ class TestRowDataToAvroConverters {
     RowDataToAvroConverters.RowDataToAvroConverter converter =
             RowDataToAvroConverters.createConverter(rowType);
     GenericRecord avroRecord =
-            (GenericRecord) 
converter.convert(AvroSchemaConverter.convertToSchema(rowType), rowData);
+            (GenericRecord) 
converter.convert(HoodieSchemaConverter.convertToSchema(rowType), rowData);
     Assertions.assertEquals(timestampFromUtc0, 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.of("UTC"))));
     Assertions.assertEquals("2021-03-30 08:44:29", 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.of("UTC+1"))));
     Assertions.assertEquals("2021-03-30 15:44:29", 
formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli((Long) 
avroRecord.get(0)), ZoneId.of("Asia/Shanghai"))));

Reply via email to