This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 23b283acf3e [HUDI-7726] Restructure TableSchemaResolver to separate 
Hadoop logic and use BaseFileUtils (#11185)
23b283acf3e is described below

commit 23b283acf3e4c30e26652edf9c710e17e47951c5
Author: Jon Vexler <jbvex...@gmail.com>
AuthorDate: Fri May 10 17:19:23 2024 -0400

    [HUDI-7726] Restructure TableSchemaResolver to separate Hadoop logic and 
use BaseFileUtils (#11185)
    
    Co-authored-by: Jonathan Vexler <=>
    Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com>
---
 .../hudi/cli/commands/HoodieLogFileCommand.java    |  15 +--
 .../hudi/io/HoodieKeyLocationFetchHandle.java      |   7 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |  12 +-
 .../testutils/HoodieJavaClientTestHarness.java     |  10 +-
 .../functional/TestHoodieBackedMetadata.java       |  12 +-
 .../functional/TestHoodieBackedTableMetadata.java  |   7 +-
 .../hudi/common/model/HoodiePartitionMetadata.java |   2 +-
 .../hudi/common/table/TableSchemaResolver.java     | 122 +++----------------
 .../org/apache/hudi/common/util/BaseFileUtils.java |  11 +-
 .../hudi/table/catalog/TableOptionProperties.java  |   4 +-
 .../common/table/ParquetTableSchemaResolver.java   |  66 +++++++++++
 .../org/apache/hudi/common/util/HFileUtils.java    | 130 +++++++++++++++++++++
 .../hudi/common/table/TestTableSchemaResolver.java |   7 +-
 .../ShowHoodieLogFileMetadataProcedure.scala       |   3 +-
 .../ShowHoodieLogFileRecordsProcedure.scala        |   9 +-
 .../apache/hudi/sync/common/HoodieSyncClient.java  |   6 +-
 .../utilities/HoodieMetadataTableValidator.java    |   8 +-
 17 files changed, 259 insertions(+), 172 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
index 367dc2302ee..d3c30143072 100644
--- 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
+++ 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java
@@ -49,8 +49,6 @@ import org.apache.hudi.storage.StoragePath;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
 import org.springframework.shell.standard.ShellComponent;
 import org.springframework.shell.standard.ShellMethod;
 import org.springframework.shell.standard.ShellOption;
@@ -109,9 +107,7 @@ public class HoodieLogFileCommand {
       } else {
         fileName = path.getName();
       }
-      MessageType schema = TableSchemaResolver.readSchemaFromLogFile(storage, 
path);
-      Schema writerSchema = schema != null
-          ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) 
: null;
+      Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, 
path);
       try (Reader reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(path), writerSchema)) {
 
         // read the avro blocks
@@ -213,14 +209,13 @@ public class HoodieLogFileCommand {
     checkArgument(logFilePaths.size() > 0, "There is no log file");
 
     // TODO : readerSchema can change across blocks/log files, fix this inside 
Scanner
-    AvroSchemaConverter converter = new AvroSchemaConverter();
     Schema readerSchema = null;
     // get schema from last log file
     for (int i = logFilePaths.size() - 1; i >= 0; i--) {
-      MessageType schema = TableSchemaResolver.readSchemaFromLogFile(
+      Schema schema = TableSchemaResolver.readSchemaFromLogFile(
           storage, new StoragePath(logFilePaths.get(i)));
       if (schema != null) {
-        readerSchema = converter.convert(schema);
+        readerSchema = schema;
         break;
       }
     }
@@ -257,10 +252,8 @@ public class HoodieLogFileCommand {
       }
     } else {
       for (String logFile : logFilePaths) {
-        MessageType schema = TableSchemaResolver.readSchemaFromLogFile(
+        Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(
             client.getStorage(), new StoragePath(logFile));
-        Schema writerSchema = schema != null
-            ? new 
AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null;
         try (HoodieLogFormat.Reader reader =
                  HoodieLogFormat.newReader(storage, new HoodieLogFile(new 
StoragePath(logFile)), writerSchema)) {
           // read the avro blocks
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
index 30e2437485e..f05a0af3449 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java
@@ -27,7 +27,6 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.keygen.BaseKeyGenerator;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 
 import java.util.List;
@@ -51,11 +50,11 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> 
extends HoodieReadHandle<T
   }
 
   private List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(HoodieBaseFile baseFile) {
-    BaseFileUtils baseFileUtils = 
BaseFileUtils.getInstance(baseFile.getPath());
+    BaseFileUtils baseFileUtils = 
BaseFileUtils.getInstance(baseFile.getStoragePath());
     if (keyGeneratorOpt.isPresent()) {
-      return 
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), new 
StoragePath(baseFile.getPath()), keyGeneratorOpt);
+      return 
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), 
baseFile.getStoragePath(), keyGeneratorOpt);
     } else {
-      return 
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), new 
StoragePath(baseFile.getPath()));
+      return 
baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), 
baseFile.getStoragePath());
     }
   }
 
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 017998f0484..3c049dc9c20 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -110,8 +110,6 @@ import org.apache.hudi.testutils.TestHoodieMetadataBase;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -892,14 +890,13 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> 
logFiles, boolean enableMetaFields) throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      MessageType writerSchemaMsg = 
TableSchemaResolver.readSchemaFromLogFile(storage,
+      Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
           logFile.getPath());
-      if (writerSchemaMsg == null) {
+      if (writerSchema == null) {
         // not a data block
         continue;
       }
 
-      Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
           new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
@@ -2857,14 +2854,13 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) 
throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      MessageType writerSchemaMsg = 
TableSchemaResolver.readSchemaFromLogFile(storage,
+      Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage,
           logFile.getPath());
-      if (writerSchemaMsg == null) {
+      if (writerSchema == null) {
         // not a data block
         continue;
       }
 
-      Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
           new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
index b969598a661..05cbd7af8e8 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java
@@ -912,8 +912,8 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
     try {
       HashMap<String, String> paths =
           getLatestFileIDsToFullPath(basePath, commitTimeline, 
Arrays.asList(commitInstant));
-      return paths.values().stream().flatMap(path ->
-              
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), new 
StoragePath(path)).stream())
+      return paths.values().stream().map(StoragePath::new).flatMap(path ->
+              
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), 
path).stream())
           .filter(record -> {
             if (filterByCommitTime) {
               Object commitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
@@ -942,8 +942,8 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
     try {
       List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage, 
paths);
       return latestFiles.stream().mapToLong(baseFile ->
-              BaseFileUtils.getInstance(baseFile.getPath())
-                  .readAvroRecords(context.getStorageConf(), new 
StoragePath(baseFile.getPath())).size())
+              BaseFileUtils.getInstance(baseFile.getStoragePath())
+                  .readAvroRecords(context.getStorageConf(), 
baseFile.getStoragePath()).size())
           .sum();
     } catch (Exception e) {
       throw new HoodieException("Error reading hoodie table as a dataframe", 
e);
@@ -980,7 +980,7 @@ public abstract class HoodieJavaClientTestHarness extends 
HoodieWriterClientTest
       HashMap<String, String> fileIdToFullPath = 
getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn);
       String[] paths = fileIdToFullPath.values().toArray(new 
String[fileIdToFullPath.size()]);
       if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
-        return Arrays.stream(paths).flatMap(path -> 
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), new 
StoragePath(path)).stream())
+        return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> 
BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), 
path).stream())
             .filter(record -> {
               if (lastCommitTimeOpt.isPresent()) {
                 Object commitTime = 
record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 52938c98547..b655cbc2ab5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -117,8 +117,6 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
 import org.apache.spark.api.java.JavaRDD;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Disabled;
@@ -1359,14 +1357,13 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> 
logFiles, boolean enableMetaFields) throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      MessageType writerSchemaMsg =
+      Schema writerSchema  =
           TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath());
-      if (writerSchemaMsg == null) {
+      if (writerSchema == null) {
         // not a data block
         continue;
       }
 
-      Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
           new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
@@ -3724,14 +3721,13 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) 
throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      MessageType writerSchemaMsg =
+      Schema writerSchema =
           TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath());
-      if (writerSchemaMsg == null) {
+      if (writerSchema == null) {
         // not a data block
         continue;
       }
 
-      Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
           new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 9bcfe2ca733..9e8521d669b 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -59,8 +59,6 @@ import org.apache.hudi.table.HoodieTable;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -453,14 +451,13 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
   private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> 
logFiles) throws IOException {
     for (HoodieLogFile logFile : logFiles) {
       List<StoragePathInfo> pathInfoList = 
storage.listDirectEntries(logFile.getPath());
-      MessageType writerSchemaMsg =
+      Schema writerSchema  =
           TableSchemaResolver.readSchemaFromLogFile(storage, 
logFile.getPath());
-      if (writerSchemaMsg == null) {
+      if (writerSchema == null) {
         // not a data block
         continue;
       }
 
-      Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg);
       try (HoodieLogFormat.Reader logFileReader = 
HoodieLogFormat.newReader(storage,
           new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) {
         while (logFileReader.hasNext()) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index f334ceaf6bb..e8edc8b9142 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -185,7 +185,7 @@ public class HoodiePartitionMetadata {
   private boolean readBaseFormatMetaFile() {
     for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) {
       try {
-        BaseFileUtils reader = 
BaseFileUtils.getInstance(metafilePath.toString());
+        BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath);
         // Data file format
         Map<String, String> metadata = reader.readFooter(
             storage.getConf(), true, metafilePath, PARTITION_DEPTH_KEY, 
COMMIT_TIME_KEY);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
index f3b2bc69dc5..b284fa4f881 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java
@@ -20,8 +20,8 @@ package org.apache.hudi.common.table;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
 import org.apache.hudi.common.HoodieSchemaNotFoundException;
+import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
@@ -32,7 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.ConfigUtils;
+import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
@@ -43,8 +43,6 @@ import org.apache.hudi.internal.schema.HoodieSchemaException;
 import org.apache.hudi.internal.schema.InternalSchema;
 import 
org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager;
 import org.apache.hudi.internal.schema.utils.SerDeHelper;
-import org.apache.hudi.io.storage.HoodieFileReader;
-import org.apache.hudi.io.storage.HoodieFileReaderFactory;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.util.Lazy;
@@ -52,13 +50,6 @@ import org.apache.hudi.util.Lazy;
 import org.apache.avro.JsonProperties;
 import org.apache.avro.Schema;
 import org.apache.avro.Schema.Field;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.format.converter.ParquetMetadataConverter;
-import org.apache.parquet.hadoop.ParquetFileReader;
-import org.apache.parquet.hadoop.metadata.ParquetMetadata;
-import org.apache.parquet.schema.MessageType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -84,7 +75,7 @@ public class TableSchemaResolver {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TableSchemaResolver.class);
 
-  private final HoodieTableMetaClient metaClient;
+  protected final HoodieTableMetaClient metaClient;
 
   /**
    * Signals whether suite of the meta-fields should have additional field 
designating
@@ -121,7 +112,7 @@ public class TableSchemaResolver {
   }
 
   private Option<Schema> getTableAvroSchemaFromDataFileInternal() {
-    return 
getTableParquetSchemaFromDataFile().map(this::convertParquetSchemaToAvro);
+    return getTableParquetSchemaFromDataFile();
   }
 
   /**
@@ -168,24 +159,6 @@ public class TableSchemaResolver {
     return getTableAvroSchemaInternal(includeMetadataFields, 
Option.of(instant)).orElseThrow(schemaNotFoundError());
   }
 
-  /**
-   * Gets full schema (user + metadata) for a hoodie table in Parquet format.
-   *
-   * @return Parquet schema for the table
-   */
-  public MessageType getTableParquetSchema() throws Exception {
-    return convertAvroSchemaToParquet(getTableAvroSchema(true));
-  }
-
-  /**
-   * Gets users data schema for a hoodie table in Parquet format.
-   *
-   * @return Parquet schema for the table
-   */
-  public MessageType getTableParquetSchema(boolean includeMetadataField) 
throws Exception {
-    return 
convertAvroSchemaToParquet(getTableAvroSchema(includeMetadataField));
-  }
-
   /**
    * Gets users data schema for a hoodie table in Avro format.
    *
@@ -273,7 +246,7 @@ public class TableSchemaResolver {
   /**
    * Fetches the schema for a table from any the table's data files
    */
-  private Option<MessageType> getTableParquetSchemaFromDataFile() {
+  private Option<Schema> getTableParquetSchemaFromDataFile() {
     Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata 
= getLatestCommitMetadataWithValidData();
     try {
       switch (metaClient.getTableType()) {
@@ -300,21 +273,6 @@ public class TableSchemaResolver {
     }
   }
 
-  public static MessageType convertAvroSchemaToParquet(Schema schema, 
Configuration hadoopConf) {
-    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(hadoopConf);
-    return avroSchemaConverter.convert(schema);
-  }
-
-  private Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
-    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class));
-    return avroSchemaConverter.convert(parquetSchema);
-  }
-
-  private MessageType convertAvroSchemaToParquet(Schema schema) {
-    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class));
-    return avroSchemaConverter.convert(schema);
-  }
-
   /**
    * Returns table's latest Avro {@link Schema} iff table is non-empty (ie 
there's at least
    * a single commit)
@@ -330,43 +288,12 @@ public class TableSchemaResolver {
     return Option.empty();
   }
 
-  private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) 
throws IOException {
-    LOG.info("Reading schema from {}", parquetFilePath);
-
-    ParquetMetadata fileFooter =
-        ParquetFileReader.readFooter(
-            metaClient.getRawHoodieStorage().unwrapConfAs(Configuration.class),
-            parquetFilePath, ParquetMetadataConverter.NO_FILTER);
-    return fileFooter.getFileMetaData().getSchema();
-  }
-
-  private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws 
IOException {
-    LOG.info("Reading schema from {}", hFilePath);
-
-    try (HoodieFileReader fileReader =
-             
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-                 .getFileReader(
-                     ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
-                     metaClient.getRawHoodieStorage().getConf(),
-                     new StoragePath(hFilePath.toUri()))) {
-      return convertAvroSchemaToParquet(fileReader.getSchema());
-    }
-  }
-
-  private MessageType readSchemaFromORCBaseFile(StoragePath orcFilePath) 
throws IOException {
-    LOG.info("Reading schema from {}", orcFilePath);
-    HoodieFileReader orcReader = 
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
-        .getFileReader(metaClient.getTableConfig(), 
metaClient.getRawHoodieStorage().getConf(), orcFilePath,
-            HoodieFileFormat.ORC, Option.empty());
-    return convertAvroSchemaToParquet(orcReader.getSchema());
-  }
-
   /**
    * Read schema from a data file from the last compaction commit done.
    *
    * @deprecated please use {@link #getTableAvroSchema(HoodieInstant, 
boolean)} instead
    */
-  public MessageType readSchemaFromLastCompaction(Option<HoodieInstant> 
lastCompactionCommitOpt) throws Exception {
+  public Schema readSchemaFromLastCompaction(Option<HoodieInstant> 
lastCompactionCommitOpt) throws Exception {
     HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
 
     HoodieInstant lastCompactionCommit = 
lastCompactionCommitOpt.orElseThrow(() -> new Exception(
@@ -378,10 +305,11 @@ public class TableSchemaResolver {
     String filePath = 
compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().stream().findAny()
         .orElseThrow(() -> new IllegalArgumentException("Could not find any 
data file written for compaction "
             + lastCompactionCommit + ", could not get schema for table " + 
metaClient.getBasePath()));
-    return readSchemaFromBaseFile(filePath);
+    StoragePath path = new StoragePath(filePath);
+    return 
BaseFileUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(), 
path);
   }
 
-  private MessageType readSchemaFromLogFile(StoragePath path) throws 
IOException {
+  private Schema readSchemaFromLogFile(StoragePath path) throws IOException {
     return readSchemaFromLogFile(metaClient.getRawHoodieStorage(), path);
   }
 
@@ -390,7 +318,7 @@ public class TableSchemaResolver {
    *
    * @return
    */
-  public static MessageType readSchemaFromLogFile(HoodieStorage storage, 
StoragePath path) throws IOException {
+  public static Schema readSchemaFromLogFile(HoodieStorage storage, 
StoragePath path) throws IOException {
     // We only need to read the schema from the log block header,
     // so we read the block lazily to avoid reading block content
     // containing the records
@@ -402,7 +330,7 @@ public class TableSchemaResolver {
           lastBlock = (HoodieDataBlock) block;
         }
       }
-      return lastBlock != null ? new 
AvroSchemaConverter().convert(lastBlock.getSchema()) : null;
+      return lastBlock != null ? lastBlock.getSchema() : null;
     }
   }
 
@@ -537,30 +465,18 @@ public class TableSchemaResolver {
         });
   }
 
-  private MessageType fetchSchemaFromFiles(Iterator<String> filePaths) throws 
IOException {
-    MessageType type = null;
-    while (filePaths.hasNext() && type == null) {
-      String filePath = filePaths.next();
-      if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) {
+  private Schema fetchSchemaFromFiles(Iterator<String> filePaths) throws 
IOException {
+    Schema schema = null;
+    while (filePaths.hasNext() && schema == null) {
+      StoragePath filePath = new StoragePath(filePaths.next());
+      if (FSUtils.isLogFile(filePath)) {
         // this is a log file
-        type = readSchemaFromLogFile(new StoragePath(filePath));
+        schema = readSchemaFromLogFile(filePath);
       } else {
-        type = readSchemaFromBaseFile(filePath);
+        schema = 
BaseFileUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(), 
filePath);
       }
     }
-    return type;
-  }
-
-  private MessageType readSchemaFromBaseFile(String filePath) throws 
IOException {
-    if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) {
-      return readSchemaFromParquetBaseFile(new Path(filePath));
-    } else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) {
-      return readSchemaFromHFileBaseFile(new Path(filePath));
-    } else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) {
-      return readSchemaFromORCBaseFile(new StoragePath(filePath));
-    } else {
-      throw new IllegalArgumentException("Unknown base file format :" + 
filePath);
-    }
+    return schema;
   }
 
   public static Schema appendPartitionColumns(Schema dataSchema, 
Option<String[]> partitionFields) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
index b36957609fb..a4c3e0edf87 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java
@@ -53,12 +53,15 @@ import java.util.stream.Collectors;
 public abstract class BaseFileUtils {
   public static final String PARQUET_UTILS = 
"org.apache.hudi.common.util.ParquetUtils";
   public static final String ORC_UTILS = 
"org.apache.hudi.common.util.OrcUtils";
+  public static final String HFILE_UTILS = 
"org.apache.hudi.common.util.HFileUtils";
 
-  public static BaseFileUtils getInstance(String path) {
-    if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
+  public static BaseFileUtils getInstance(StoragePath path) {
+    if 
(path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) {
       return ReflectionUtils.loadClass(PARQUET_UTILS);
-    } else if (path.endsWith(HoodieFileFormat.ORC.getFileExtension())) {
+    } else if 
(path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) {
       return ReflectionUtils.loadClass(ORC_UTILS);
+    } else if 
(path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) {
+      return ReflectionUtils.loadClass(HFILE_UTILS);
     }
     throw new UnsupportedOperationException("The format for file " + path + " 
is not supported yet.");
   }
@@ -68,6 +71,8 @@ public abstract class BaseFileUtils {
       return ReflectionUtils.loadClass(PARQUET_UTILS);
     } else if (HoodieFileFormat.ORC.equals(fileFormat)) {
       return ReflectionUtils.loadClass(ORC_UTILS);
+    } else if (HoodieFileFormat.HFILE.equals(fileFormat)) {
+      return ReflectionUtils.loadClass(HFILE_UTILS);
     }
     throw new UnsupportedOperationException(fileFormat.name() + " format not 
supported yet.");
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
index f451bfce64e..2a6ee0154d0 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java
@@ -20,7 +20,7 @@ package org.apache.hudi.table.catalog;
 
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.TableSchemaResolver;
+import org.apache.hudi.common.table.ParquetTableSchemaResolver;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieValidationException;
@@ -198,7 +198,7 @@ public class TableOptionProperties {
       boolean withOperationField) {
     RowType rowType = supplementMetaFields((RowType) 
catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(), 
withOperationField);
     Schema schema = AvroSchemaConverter.convertToSchema(rowType);
-    MessageType messageType = 
TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
+    MessageType messageType = 
ParquetTableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf);
     String sparkVersion = 
catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION);
     Map<String, String> sparkTableProperties = 
SparkDataSourceTableUtils.getSparkTableProperties(
         partitionKeys,
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java
 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java
new file mode 100644
index 00000000000..0b70677f862
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.table;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.schema.MessageType;
+
+public class ParquetTableSchemaResolver extends TableSchemaResolver {
+
+  public ParquetTableSchemaResolver(HoodieTableMetaClient metaClient) {
+    super(metaClient);
+  }
+
+  public static MessageType convertAvroSchemaToParquet(Schema schema, 
Configuration hadoopConf) {
+    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(hadoopConf);
+    return avroSchemaConverter.convert(schema);
+  }
+
+  private Schema convertParquetSchemaToAvro(MessageType parquetSchema) {
+    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class));
+    return avroSchemaConverter.convert(parquetSchema);
+  }
+
+  private MessageType convertAvroSchemaToParquet(Schema schema) {
+    AvroSchemaConverter avroSchemaConverter = new 
AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class));
+    return avroSchemaConverter.convert(schema);
+  }
+
+  /**
+   * Gets full schema (user + metadata) for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   */
+  public MessageType getTableParquetSchema() throws Exception {
+    return convertAvroSchemaToParquet(getTableAvroSchema(true));
+  }
+
+  /**
+   * Gets users data schema for a hoodie table in Parquet format.
+   *
+   * @return Parquet schema for the table
+   */
+  public MessageType getTableParquetSchema(boolean includeMetadataField) 
throws Exception {
+    return 
convertAvroSchemaToParquet(getTableAvroSchema(includeMetadataField));
+  }
+
+}
diff --git 
a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
new file mode 100644
index 00000000000..48f7e41e047
--- /dev/null
+++ 
b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieFileReader;
+import org.apache.hudi.io.storage.HoodieFileReaderFactory;
+import org.apache.hudi.keygen.BaseKeyGenerator;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StorageConfiguration;
+import org.apache.hudi.storage.StoragePath;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+/**
+ * Utility functions for HFile files.
+ */
+public class HFileUtils extends BaseFileUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HFileUtils.class);
+
+  @Override
+  public List<GenericRecord> readAvroRecords(StorageConfiguration<?> 
configuration, StoragePath filePath) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
readAvroRecords");
+  }
+
+  @Override
+  public List<GenericRecord> readAvroRecords(StorageConfiguration<?> 
configuration, StoragePath filePath, Schema schema) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
readAvroRecords");
+  }
+
+  @Override
+  public Map<String, String> readFooter(StorageConfiguration<?> configuration, 
boolean required, StoragePath filePath, String... footerNames) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
readFooter");
+  }
+
+  @Override
+  public long getRowCount(StorageConfiguration<?> configuration, StoragePath 
filePath) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
getRowCount");
+  }
+
+  @Override
+  public Set<Pair<String, Long>> filterRowKeys(StorageConfiguration<?> 
configuration, StoragePath filePath, Set<String> filter) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
filterRowKeys");
+  }
+
+  @Override
+  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(StorageConfiguration<?> configuration, StoragePath 
filePath) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
fetchRecordKeysWithPositions");
+  }
+
+  @Override
+  public ClosableIterator<HoodieKey> 
getHoodieKeyIterator(StorageConfiguration<?> configuration, StoragePath 
filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
getHoodieKeyIterator");
+  }
+
+  @Override
+  public ClosableIterator<HoodieKey> 
getHoodieKeyIterator(StorageConfiguration<?> configuration, StoragePath 
filePath) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
getHoodieKeyIterator");
+  }
+
+  @Override
+  public List<Pair<HoodieKey, Long>> 
fetchRecordKeysWithPositions(StorageConfiguration<?> configuration, StoragePath 
filePath, Option<BaseKeyGenerator> keyGeneratorOpt) {
+    throw new UnsupportedOperationException("HFileUtils does not support 
fetchRecordKeysWithPositions");
+  }
+
+  @Override
+  public Schema readAvroSchema(StorageConfiguration<?> configuration, 
StoragePath filePath) {
+    LOG.info("Reading schema from {}", filePath);
+
+    try (HoodieFileReader fileReader =
+             
HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+                 .getFileReader(
+                     ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER,
+                     configuration,
+                     filePath)) {
+      return fileReader.getSchema();
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read schema from HFile", e);
+    }
+  }
+
+  @Override
+  public List<HoodieColumnRangeMetadata<Comparable>> 
readColumnStatsFromMetadata(StorageConfiguration<?> storageConf, StoragePath 
filePath, List<String> columnList) {
+    throw new UnsupportedOperationException(
+        "Reading column statistics from metadata is not supported for HFile 
format yet");
+  }
+
+  @Override
+  public HoodieFileFormat getFormat() {
+    return HoodieFileFormat.HFILE;
+  }
+
+  @Override
+  public void writeMetaFile(HoodieStorage storage, StoragePath filePath, 
Properties props) throws IOException {
+    throw new UnsupportedOperationException("HFileUtils does not support 
writeMetaFile");
+  }
+}
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
index efb88412a21..69c4d35a847 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java
@@ -34,7 +34,6 @@ import org.apache.hudi.storage.StoragePath;
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
-import org.apache.parquet.avro.AvroSchemaConverter;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -96,10 +95,8 @@ public class TestTableSchemaResolver {
     StoragePath partitionPath = new StoragePath(testDir, "partition1");
     Schema expectedSchema = getSimpleSchema();
     StoragePath logFilePath = writeLogFile(partitionPath, expectedSchema);
-    assertEquals(
-        new AvroSchemaConverter().convert(expectedSchema),
-        
TableSchemaResolver.readSchemaFromLogFile(HoodieStorageUtils.getStorage(
-            logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()), 
logFilePath));
+    assertEquals(expectedSchema, 
TableSchemaResolver.readSchemaFromLogFile(HoodieStorageUtils.getStorage(
+        logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()), 
logFilePath));
   }
 
   private String initTestDir(String folderName) throws IOException {
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
index 36f4ad4b1bc..05ea6ae4548 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala
@@ -67,8 +67,7 @@ class ShowHoodieLogFileMetadataProcedure extends 
BaseProcedure with ProcedureBui
     logFilePaths.foreach {
       logFilePath => {
         val statuses = storage.listDirectEntries(new StoragePath(logFilePath))
-        val schema = new AvroSchemaConverter()
-          
.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
 new StoragePath(logFilePath))))
+        val schema = TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePath))
         val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(statuses.get(0).getPath), schema)
 
         // read the avro blocks
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
index 970c9352dc5..22d0e423155 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.hudi.command.procedures
 
+import org.apache.avro.generic.IndexedRecord
 import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, 
HoodieReaderConfig}
 import org.apache.hudi.common.fs.FSUtils
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType
@@ -26,9 +27,6 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock
 import org.apache.hudi.common.table.log.{HoodieLogFormat, 
HoodieMergedLogRecordScanner}
 import org.apache.hudi.common.util.{FileIOUtils, ValidationUtils}
 import org.apache.hudi.storage.StoragePath
-
-import org.apache.avro.generic.IndexedRecord
-import org.apache.parquet.avro.AvroSchemaConverter
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
 
@@ -60,10 +58,9 @@ class ShowHoodieLogFileRecordsProcedure extends 
BaseProcedure with ProcedureBuil
     val logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(storage, new 
StoragePath(logFilePathPattern)).iterator().asScala
       .map(_.getPath.toString).toList
     ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log 
file")
-    val converter = new AvroSchemaConverter()
     val allRecords: java.util.List[IndexedRecord] = new 
java.util.ArrayList[IndexedRecord]
     if (merge) {
-      val schema = 
converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
 new StoragePath(logFilePaths.last))))
+      val schema = 
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePaths.last)))
       val scanner = HoodieMergedLogRecordScanner.newBuilder
         .withStorage(storage)
         .withBasePath(basePath)
@@ -86,7 +83,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure 
with ProcedureBuil
     } else {
       logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach {
         logFilePath => {
-          val schema = 
converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage,
 new StoragePath(logFilePath))))
+          val schema = 
Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePath)))
           val reader = HoodieLogFormat.newReader(storage, new 
HoodieLogFile(logFilePath), schema)
           while (reader.hasNext) {
             val block = reader.next()
diff --git 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
index aef8c3aea76..2eb13f5a787 100644
--- 
a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
+++ 
b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java
@@ -21,8 +21,8 @@ package org.apache.hudi.sync.common;
 import org.apache.hudi.common.engine.HoodieLocalEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.ParquetTableSchemaResolver;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.TableSchemaResolver;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
@@ -99,7 +99,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
   @Override
   public MessageType getStorageSchema() {
     try {
-      return new TableSchemaResolver(metaClient).getTableParquetSchema();
+      return new 
ParquetTableSchemaResolver(metaClient).getTableParquetSchema();
     } catch (Exception e) {
       throw new HoodieSyncException("Failed to read schema from storage.", e);
     }
@@ -108,7 +108,7 @@ public abstract class HoodieSyncClient implements 
HoodieMetaSyncOperations, Auto
   @Override
   public MessageType getStorageSchema(boolean includeMetadataField) {
     try {
-      return new 
TableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField);
+      return new 
ParquetTableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField);
     } catch (Exception e) {
       throw new HoodieSyncException("Failed to read schema from storage.", e);
     }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index a9e2d895c13..3ddb681779f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -73,8 +73,6 @@ import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
-import org.apache.parquet.avro.AvroSchemaConverter;
-import org.apache.parquet.schema.MessageType;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -1168,20 +1166,18 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     String basePath = metaClient.getBasePathV2().toString();
     HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline();
-    AvroSchemaConverter converter = new AvroSchemaConverter();
     HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
     HoodieTimeline inflightInstantsTimeline = 
commitsTimeline.filterInflights();
 
     for (String logFilePathStr : logFilePathSet) {
       HoodieLogFormat.Reader reader = null;
       try {
-        MessageType messageType =
+        Schema readerSchema =
             TableSchemaResolver.readSchemaFromLogFile(storage, new 
StoragePath(logFilePathStr));
-        if (messageType == null) {
+        if (readerSchema == null) {
           LOG.warn("Cannot read schema from log file {}. Skip the check as 
it's likely being written by an inflight instant.", logFilePathStr);
           continue;
         }
-        Schema readerSchema = converter.convert(messageType);
         reader =
             HoodieLogFormat.newReader(storage, new 
HoodieLogFile(logFilePathStr), readerSchema, false);
         // read the avro blocks

Reply via email to