This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 23b283acf3e [HUDI-7726] Restructure TableSchemaResolver to separate Hadoop logic and use BaseFileUtils (#11185) 23b283acf3e is described below commit 23b283acf3e4c30e26652edf9c710e17e47951c5 Author: Jon Vexler <jbvex...@gmail.com> AuthorDate: Fri May 10 17:19:23 2024 -0400 [HUDI-7726] Restructure TableSchemaResolver to separate Hadoop logic and use BaseFileUtils (#11185) Co-authored-by: Jonathan Vexler <=> Co-authored-by: Y Ethan Guo <ethan.guoyi...@gmail.com> --- .../hudi/cli/commands/HoodieLogFileCommand.java | 15 +-- .../hudi/io/HoodieKeyLocationFetchHandle.java | 7 +- .../hudi/client/TestJavaHoodieBackedMetadata.java | 12 +- .../testutils/HoodieJavaClientTestHarness.java | 10 +- .../functional/TestHoodieBackedMetadata.java | 12 +- .../functional/TestHoodieBackedTableMetadata.java | 7 +- .../hudi/common/model/HoodiePartitionMetadata.java | 2 +- .../hudi/common/table/TableSchemaResolver.java | 122 +++---------------- .../org/apache/hudi/common/util/BaseFileUtils.java | 11 +- .../hudi/table/catalog/TableOptionProperties.java | 4 +- .../common/table/ParquetTableSchemaResolver.java | 66 +++++++++++ .../org/apache/hudi/common/util/HFileUtils.java | 130 +++++++++++++++++++++ .../hudi/common/table/TestTableSchemaResolver.java | 7 +- .../ShowHoodieLogFileMetadataProcedure.scala | 3 +- .../ShowHoodieLogFileRecordsProcedure.scala | 9 +- .../apache/hudi/sync/common/HoodieSyncClient.java | 6 +- .../utilities/HoodieMetadataTableValidator.java | 8 +- 17 files changed, 259 insertions(+), 172 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java index 367dc2302ee..d3c30143072 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/HoodieLogFileCommand.java @@ -49,8 +49,6 @@ import org.apache.hudi.storage.StoragePath; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.schema.MessageType; import org.springframework.shell.standard.ShellComponent; import org.springframework.shell.standard.ShellMethod; import org.springframework.shell.standard.ShellOption; @@ -109,9 +107,7 @@ public class HoodieLogFileCommand { } else { fileName = path.getName(); } - MessageType schema = TableSchemaResolver.readSchemaFromLogFile(storage, path); - Schema writerSchema = schema != null - ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null; + Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, path); try (Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(path), writerSchema)) { // read the avro blocks @@ -213,14 +209,13 @@ public class HoodieLogFileCommand { checkArgument(logFilePaths.size() > 0, "There is no log file"); // TODO : readerSchema can change across blocks/log files, fix this inside Scanner - AvroSchemaConverter converter = new AvroSchemaConverter(); Schema readerSchema = null; // get schema from last log file for (int i = logFilePaths.size() - 1; i >= 0; i--) { - MessageType schema = TableSchemaResolver.readSchemaFromLogFile( + Schema schema = TableSchemaResolver.readSchemaFromLogFile( storage, new StoragePath(logFilePaths.get(i))); if (schema != null) { - readerSchema = converter.convert(schema); + readerSchema = schema; break; } } @@ -257,10 +252,8 @@ public class HoodieLogFileCommand { } } else { for (String logFile : logFilePaths) { - MessageType schema = TableSchemaResolver.readSchemaFromLogFile( + Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile( client.getStorage(), new StoragePath(logFile)); - Schema writerSchema = schema != null - ? new AvroSchemaConverter().convert(Objects.requireNonNull(schema)) : null; try (HoodieLogFormat.Reader reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(new StoragePath(logFile)), writerSchema)) { // read the avro blocks diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java index 30e2437485e..f05a0af3449 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieKeyLocationFetchHandle.java @@ -27,7 +27,6 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.keygen.BaseKeyGenerator; -import org.apache.hudi.storage.StoragePath; import org.apache.hudi.table.HoodieTable; import java.util.List; @@ -51,11 +50,11 @@ public class HoodieKeyLocationFetchHandle<T, I, K, O> extends HoodieReadHandle<T } private List<Pair<HoodieKey, Long>> fetchRecordKeysWithPositions(HoodieBaseFile baseFile) { - BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getPath()); + BaseFileUtils baseFileUtils = BaseFileUtils.getInstance(baseFile.getStoragePath()); if (keyGeneratorOpt.isPresent()) { - return baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), new StoragePath(baseFile.getPath()), keyGeneratorOpt); + return baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), baseFile.getStoragePath(), keyGeneratorOpt); } else { - return baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), new StoragePath(baseFile.getPath())); + return baseFileUtils.fetchRecordKeysWithPositions(hoodieTable.getStorageConf(), baseFile.getStoragePath()); } } diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java index 017998f0484..3c049dc9c20 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java @@ -110,8 +110,6 @@ import org.apache.hudi.testutils.TestHoodieMetadataBase; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.schema.MessageType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; @@ -892,14 +890,13 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> logFiles, boolean enableMetaFields) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(storage, + Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); - if (writerSchemaMsg == null) { + if (writerSchema == null) { // not a data block continue; } - Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { @@ -2857,14 +2854,13 @@ public class TestJavaHoodieBackedMetadata extends TestHoodieMetadataBase { private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - MessageType writerSchemaMsg = TableSchemaResolver.readSchemaFromLogFile(storage, + Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); - if (writerSchemaMsg == null) { + if (writerSchema == null) { // not a data block continue; } - Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { diff --git a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java index b969598a661..05cbd7af8e8 100644 --- a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java +++ b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/HoodieJavaClientTestHarness.java @@ -912,8 +912,8 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest try { HashMap<String, String> paths = getLatestFileIDsToFullPath(basePath, commitTimeline, Arrays.asList(commitInstant)); - return paths.values().stream().flatMap(path -> - BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), new StoragePath(path)).stream()) + return paths.values().stream().map(StoragePath::new).flatMap(path -> + BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream()) .filter(record -> { if (filterByCommitTime) { Object commitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD); @@ -942,8 +942,8 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest try { List<HoodieBaseFile> latestFiles = getLatestBaseFiles(basePath, storage, paths); return latestFiles.stream().mapToLong(baseFile -> - BaseFileUtils.getInstance(baseFile.getPath()) - .readAvroRecords(context.getStorageConf(), new StoragePath(baseFile.getPath())).size()) + BaseFileUtils.getInstance(baseFile.getStoragePath()) + .readAvroRecords(context.getStorageConf(), baseFile.getStoragePath()).size()) .sum(); } catch (Exception e) { throw new HoodieException("Error reading hoodie table as a dataframe", e); @@ -980,7 +980,7 @@ public abstract class HoodieJavaClientTestHarness extends HoodieWriterClientTest HashMap<String, String> fileIdToFullPath = getLatestFileIDsToFullPath(basePath, commitTimeline, commitsToReturn); String[] paths = fileIdToFullPath.values().toArray(new String[fileIdToFullPath.size()]); if (paths[0].endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { - return Arrays.stream(paths).flatMap(path -> BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), new StoragePath(path)).stream()) + return Arrays.stream(paths).map(StoragePath::new).flatMap(path -> BaseFileUtils.getInstance(path).readAvroRecords(context.getStorageConf(), path).stream()) .filter(record -> { if (lastCommitTimeOpt.isPresent()) { Object commitTime = record.get(HoodieRecord.COMMIT_TIME_METADATA_FIELD); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index 52938c98547..b655cbc2ab5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -117,8 +117,6 @@ import org.apache.hudi.testutils.MetadataMergeWriteStatus; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.schema.MessageType; import org.apache.spark.api.java.JavaRDD; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Disabled; @@ -1359,14 +1357,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> logFiles, boolean enableMetaFields) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - MessageType writerSchemaMsg = + Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); - if (writerSchemaMsg == null) { + if (writerSchema == null) { // not a data block continue; } - Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { @@ -3724,14 +3721,13 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { private void verifyMetadataColumnStatsRecords(List<HoodieLogFile> logFiles) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - MessageType writerSchemaMsg = + Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); - if (writerSchemaMsg == null) { + if (writerSchema == null) { // not a data block continue; } - Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java index 9bcfe2ca733..9e8521d669b 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java @@ -59,8 +59,6 @@ import org.apache.hudi.table.HoodieTable; import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; import org.apache.avro.generic.IndexedRecord; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.schema.MessageType; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; import org.junit.jupiter.params.provider.ValueSource; @@ -453,14 +451,13 @@ public class TestHoodieBackedTableMetadata extends TestHoodieMetadataBase { private void verifyMetadataRawRecords(HoodieTable table, List<HoodieLogFile> logFiles) throws IOException { for (HoodieLogFile logFile : logFiles) { List<StoragePathInfo> pathInfoList = storage.listDirectEntries(logFile.getPath()); - MessageType writerSchemaMsg = + Schema writerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, logFile.getPath()); - if (writerSchemaMsg == null) { + if (writerSchema == null) { // not a data block continue; } - Schema writerSchema = new AvroSchemaConverter().convert(writerSchemaMsg); try (HoodieLogFormat.Reader logFileReader = HoodieLogFormat.newReader(storage, new HoodieLogFile(pathInfoList.get(0).getPath()), writerSchema)) { while (logFileReader.hasNext()) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java index f334ceaf6bb..e8edc8b9142 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java @@ -185,7 +185,7 @@ public class HoodiePartitionMetadata { private boolean readBaseFormatMetaFile() { for (StoragePath metafilePath : baseFormatMetaFilePaths(partitionPath)) { try { - BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath.toString()); + BaseFileUtils reader = BaseFileUtils.getInstance(metafilePath); // Data file format Map<String, String> metadata = reader.readFooter( storage.getConf(), true, metafilePath, PARTITION_DEPTH_KEY, COMMIT_TIME_KEY); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java index f3b2bc69dc5..b284fa4f881 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/TableSchemaResolver.java @@ -20,8 +20,8 @@ package org.apache.hudi.common.table; import org.apache.hudi.avro.HoodieAvroUtils; import org.apache.hudi.common.HoodieSchemaNotFoundException; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieCommitMetadata; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.WriteOperationType; @@ -32,7 +32,7 @@ import org.apache.hudi.common.table.log.block.HoodieLogBlock; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; -import org.apache.hudi.common.util.ConfigUtils; +import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.collection.Pair; @@ -43,8 +43,6 @@ import org.apache.hudi.internal.schema.HoodieSchemaException; import org.apache.hudi.internal.schema.InternalSchema; import org.apache.hudi.internal.schema.io.FileBasedInternalSchemaStorageManager; import org.apache.hudi.internal.schema.utils.SerDeHelper; -import org.apache.hudi.io.storage.HoodieFileReader; -import org.apache.hudi.io.storage.HoodieFileReaderFactory; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; import org.apache.hudi.util.Lazy; @@ -52,13 +50,6 @@ import org.apache.hudi.util.Lazy; import org.apache.avro.JsonProperties; import org.apache.avro.Schema; import org.apache.avro.Schema.Field; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.format.converter.ParquetMetadataConverter; -import org.apache.parquet.hadoop.ParquetFileReader; -import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.MessageType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -84,7 +75,7 @@ public class TableSchemaResolver { private static final Logger LOG = LoggerFactory.getLogger(TableSchemaResolver.class); - private final HoodieTableMetaClient metaClient; + protected final HoodieTableMetaClient metaClient; /** * Signals whether suite of the meta-fields should have additional field designating @@ -121,7 +112,7 @@ public class TableSchemaResolver { } private Option<Schema> getTableAvroSchemaFromDataFileInternal() { - return getTableParquetSchemaFromDataFile().map(this::convertParquetSchemaToAvro); + return getTableParquetSchemaFromDataFile(); } /** @@ -168,24 +159,6 @@ public class TableSchemaResolver { return getTableAvroSchemaInternal(includeMetadataFields, Option.of(instant)).orElseThrow(schemaNotFoundError()); } - /** - * Gets full schema (user + metadata) for a hoodie table in Parquet format. - * - * @return Parquet schema for the table - */ - public MessageType getTableParquetSchema() throws Exception { - return convertAvroSchemaToParquet(getTableAvroSchema(true)); - } - - /** - * Gets users data schema for a hoodie table in Parquet format. - * - * @return Parquet schema for the table - */ - public MessageType getTableParquetSchema(boolean includeMetadataField) throws Exception { - return convertAvroSchemaToParquet(getTableAvroSchema(includeMetadataField)); - } - /** * Gets users data schema for a hoodie table in Avro format. * @@ -273,7 +246,7 @@ public class TableSchemaResolver { /** * Fetches the schema for a table from any the table's data files */ - private Option<MessageType> getTableParquetSchemaFromDataFile() { + private Option<Schema> getTableParquetSchemaFromDataFile() { Option<Pair<HoodieInstant, HoodieCommitMetadata>> instantAndCommitMetadata = getLatestCommitMetadataWithValidData(); try { switch (metaClient.getTableType()) { @@ -300,21 +273,6 @@ public class TableSchemaResolver { } } - public static MessageType convertAvroSchemaToParquet(Schema schema, Configuration hadoopConf) { - AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(hadoopConf); - return avroSchemaConverter.convert(schema); - } - - private Schema convertParquetSchemaToAvro(MessageType parquetSchema) { - AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class)); - return avroSchemaConverter.convert(parquetSchema); - } - - private MessageType convertAvroSchemaToParquet(Schema schema) { - AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class)); - return avroSchemaConverter.convert(schema); - } - /** * Returns table's latest Avro {@link Schema} iff table is non-empty (ie there's at least * a single commit) @@ -330,43 +288,12 @@ public class TableSchemaResolver { return Option.empty(); } - private MessageType readSchemaFromParquetBaseFile(Path parquetFilePath) throws IOException { - LOG.info("Reading schema from {}", parquetFilePath); - - ParquetMetadata fileFooter = - ParquetFileReader.readFooter( - metaClient.getRawHoodieStorage().unwrapConfAs(Configuration.class), - parquetFilePath, ParquetMetadataConverter.NO_FILTER); - return fileFooter.getFileMetaData().getSchema(); - } - - private MessageType readSchemaFromHFileBaseFile(Path hFilePath) throws IOException { - LOG.info("Reading schema from {}", hFilePath); - - try (HoodieFileReader fileReader = - HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) - .getFileReader( - ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, - metaClient.getRawHoodieStorage().getConf(), - new StoragePath(hFilePath.toUri()))) { - return convertAvroSchemaToParquet(fileReader.getSchema()); - } - } - - private MessageType readSchemaFromORCBaseFile(StoragePath orcFilePath) throws IOException { - LOG.info("Reading schema from {}", orcFilePath); - HoodieFileReader orcReader = HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) - .getFileReader(metaClient.getTableConfig(), metaClient.getRawHoodieStorage().getConf(), orcFilePath, - HoodieFileFormat.ORC, Option.empty()); - return convertAvroSchemaToParquet(orcReader.getSchema()); - } - /** * Read schema from a data file from the last compaction commit done. * * @deprecated please use {@link #getTableAvroSchema(HoodieInstant, boolean)} instead */ - public MessageType readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws Exception { + public Schema readSchemaFromLastCompaction(Option<HoodieInstant> lastCompactionCommitOpt) throws Exception { HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline(); HoodieInstant lastCompactionCommit = lastCompactionCommitOpt.orElseThrow(() -> new Exception( @@ -378,10 +305,11 @@ public class TableSchemaResolver { String filePath = compactionMetadata.getFileIdAndFullPaths(metaClient.getBasePathV2()).values().stream().findAny() .orElseThrow(() -> new IllegalArgumentException("Could not find any data file written for compaction " + lastCompactionCommit + ", could not get schema for table " + metaClient.getBasePath())); - return readSchemaFromBaseFile(filePath); + StoragePath path = new StoragePath(filePath); + return BaseFileUtils.getInstance(path).readAvroSchema(metaClient.getStorageConf(), path); } - private MessageType readSchemaFromLogFile(StoragePath path) throws IOException { + private Schema readSchemaFromLogFile(StoragePath path) throws IOException { return readSchemaFromLogFile(metaClient.getRawHoodieStorage(), path); } @@ -390,7 +318,7 @@ public class TableSchemaResolver { * * @return */ - public static MessageType readSchemaFromLogFile(HoodieStorage storage, StoragePath path) throws IOException { + public static Schema readSchemaFromLogFile(HoodieStorage storage, StoragePath path) throws IOException { // We only need to read the schema from the log block header, // so we read the block lazily to avoid reading block content // containing the records @@ -402,7 +330,7 @@ public class TableSchemaResolver { lastBlock = (HoodieDataBlock) block; } } - return lastBlock != null ? new AvroSchemaConverter().convert(lastBlock.getSchema()) : null; + return lastBlock != null ? lastBlock.getSchema() : null; } } @@ -537,30 +465,18 @@ public class TableSchemaResolver { }); } - private MessageType fetchSchemaFromFiles(Iterator<String> filePaths) throws IOException { - MessageType type = null; - while (filePaths.hasNext() && type == null) { - String filePath = filePaths.next(); - if (filePath.contains(HoodieFileFormat.HOODIE_LOG.getFileExtension())) { + private Schema fetchSchemaFromFiles(Iterator<String> filePaths) throws IOException { + Schema schema = null; + while (filePaths.hasNext() && schema == null) { + StoragePath filePath = new StoragePath(filePaths.next()); + if (FSUtils.isLogFile(filePath)) { // this is a log file - type = readSchemaFromLogFile(new StoragePath(filePath)); + schema = readSchemaFromLogFile(filePath); } else { - type = readSchemaFromBaseFile(filePath); + schema = BaseFileUtils.getInstance(filePath).readAvroSchema(metaClient.getStorageConf(), filePath); } } - return type; - } - - private MessageType readSchemaFromBaseFile(String filePath) throws IOException { - if (filePath.contains(HoodieFileFormat.PARQUET.getFileExtension())) { - return readSchemaFromParquetBaseFile(new Path(filePath)); - } else if (filePath.contains(HoodieFileFormat.HFILE.getFileExtension())) { - return readSchemaFromHFileBaseFile(new Path(filePath)); - } else if (filePath.contains(HoodieFileFormat.ORC.getFileExtension())) { - return readSchemaFromORCBaseFile(new StoragePath(filePath)); - } else { - throw new IllegalArgumentException("Unknown base file format :" + filePath); - } + return schema; } public static Schema appendPartitionColumns(Schema dataSchema, Option<String[]> partitionFields) { diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java index b36957609fb..a4c3e0edf87 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/util/BaseFileUtils.java @@ -53,12 +53,15 @@ import java.util.stream.Collectors; public abstract class BaseFileUtils { public static final String PARQUET_UTILS = "org.apache.hudi.common.util.ParquetUtils"; public static final String ORC_UTILS = "org.apache.hudi.common.util.OrcUtils"; + public static final String HFILE_UTILS = "org.apache.hudi.common.util.HFileUtils"; - public static BaseFileUtils getInstance(String path) { - if (path.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) { + public static BaseFileUtils getInstance(StoragePath path) { + if (path.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension())) { return ReflectionUtils.loadClass(PARQUET_UTILS); - } else if (path.endsWith(HoodieFileFormat.ORC.getFileExtension())) { + } else if (path.getFileExtension().equals(HoodieFileFormat.ORC.getFileExtension())) { return ReflectionUtils.loadClass(ORC_UTILS); + } else if (path.getFileExtension().equals(HoodieFileFormat.HFILE.getFileExtension())) { + return ReflectionUtils.loadClass(HFILE_UTILS); } throw new UnsupportedOperationException("The format for file " + path + " is not supported yet."); } @@ -68,6 +71,8 @@ public abstract class BaseFileUtils { return ReflectionUtils.loadClass(PARQUET_UTILS); } else if (HoodieFileFormat.ORC.equals(fileFormat)) { return ReflectionUtils.loadClass(ORC_UTILS); + } else if (HoodieFileFormat.HFILE.equals(fileFormat)) { + return ReflectionUtils.loadClass(HFILE_UTILS); } throw new UnsupportedOperationException(fileFormat.name() + " format not supported yet."); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java index f451bfce64e..2a6ee0154d0 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/TableOptionProperties.java @@ -20,7 +20,7 @@ package org.apache.hudi.table.catalog; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.table.TableSchemaResolver; +import org.apache.hudi.common.table.ParquetTableSchemaResolver; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieValidationException; @@ -198,7 +198,7 @@ public class TableOptionProperties { boolean withOperationField) { RowType rowType = supplementMetaFields((RowType) catalogTable.getSchema().toPhysicalRowDataType().getLogicalType(), withOperationField); Schema schema = AvroSchemaConverter.convertToSchema(rowType); - MessageType messageType = TableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf); + MessageType messageType = ParquetTableSchemaResolver.convertAvroSchemaToParquet(schema, hadoopConf); String sparkVersion = catalogTable.getOptions().getOrDefault(SPARK_VERSION, DEFAULT_SPARK_VERSION); Map<String, String> sparkTableProperties = SparkDataSourceTableUtils.getSparkTableProperties( partitionKeys, diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java new file mode 100644 index 00000000000..0b70677f862 --- /dev/null +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/table/ParquetTableSchemaResolver.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.table; + +import org.apache.avro.Schema; +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.schema.MessageType; + +public class ParquetTableSchemaResolver extends TableSchemaResolver { + + public ParquetTableSchemaResolver(HoodieTableMetaClient metaClient) { + super(metaClient); + } + + public static MessageType convertAvroSchemaToParquet(Schema schema, Configuration hadoopConf) { + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(hadoopConf); + return avroSchemaConverter.convert(schema); + } + + private Schema convertParquetSchemaToAvro(MessageType parquetSchema) { + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class)); + return avroSchemaConverter.convert(parquetSchema); + } + + private MessageType convertAvroSchemaToParquet(Schema schema) { + AvroSchemaConverter avroSchemaConverter = new AvroSchemaConverter(metaClient.getStorageConf().unwrapAs(Configuration.class)); + return avroSchemaConverter.convert(schema); + } + + /** + * Gets full schema (user + metadata) for a hoodie table in Parquet format. + * + * @return Parquet schema for the table + */ + public MessageType getTableParquetSchema() throws Exception { + return convertAvroSchemaToParquet(getTableAvroSchema(true)); + } + + /** + * Gets users data schema for a hoodie table in Parquet format. + * + * @return Parquet schema for the table + */ + public MessageType getTableParquetSchema(boolean includeMetadataField) throws Exception { + return convertAvroSchemaToParquet(getTableAvroSchema(includeMetadataField)); + } + +} diff --git a/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java new file mode 100644 index 00000000000..48f7e41e047 --- /dev/null +++ b/hudi-hadoop-common/src/main/java/org/apache/hudi/common/util/HFileUtils.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.util; + +import org.apache.hudi.common.model.HoodieColumnRangeMetadata; +import org.apache.hudi.common.model.HoodieFileFormat; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.collection.ClosableIterator; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.io.storage.HoodieFileReader; +import org.apache.hudi.io.storage.HoodieFileReaderFactory; +import org.apache.hudi.keygen.BaseKeyGenerator; +import org.apache.hudi.storage.HoodieStorage; +import org.apache.hudi.storage.StorageConfiguration; +import org.apache.hudi.storage.StoragePath; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +/** + * Utility functions for HFile files. + */ +public class HFileUtils extends BaseFileUtils { + + private static final Logger LOG = LoggerFactory.getLogger(HFileUtils.class); + + @Override + public List<GenericRecord> readAvroRecords(StorageConfiguration<?> configuration, StoragePath filePath) { + throw new UnsupportedOperationException("HFileUtils does not support readAvroRecords"); + } + + @Override + public List<GenericRecord> readAvroRecords(StorageConfiguration<?> configuration, StoragePath filePath, Schema schema) { + throw new UnsupportedOperationException("HFileUtils does not support readAvroRecords"); + } + + @Override + public Map<String, String> readFooter(StorageConfiguration<?> configuration, boolean required, StoragePath filePath, String... footerNames) { + throw new UnsupportedOperationException("HFileUtils does not support readFooter"); + } + + @Override + public long getRowCount(StorageConfiguration<?> configuration, StoragePath filePath) { + throw new UnsupportedOperationException("HFileUtils does not support getRowCount"); + } + + @Override + public Set<Pair<String, Long>> filterRowKeys(StorageConfiguration<?> configuration, StoragePath filePath, Set<String> filter) { + throw new UnsupportedOperationException("HFileUtils does not support filterRowKeys"); + } + + @Override + public List<Pair<HoodieKey, Long>> fetchRecordKeysWithPositions(StorageConfiguration<?> configuration, StoragePath filePath) { + throw new UnsupportedOperationException("HFileUtils does not support fetchRecordKeysWithPositions"); + } + + @Override + public ClosableIterator<HoodieKey> getHoodieKeyIterator(StorageConfiguration<?> configuration, StoragePath filePath, Option<BaseKeyGenerator> keyGeneratorOpt) { + throw new UnsupportedOperationException("HFileUtils does not support getHoodieKeyIterator"); + } + + @Override + public ClosableIterator<HoodieKey> getHoodieKeyIterator(StorageConfiguration<?> configuration, StoragePath filePath) { + throw new UnsupportedOperationException("HFileUtils does not support getHoodieKeyIterator"); + } + + @Override + public List<Pair<HoodieKey, Long>> fetchRecordKeysWithPositions(StorageConfiguration<?> configuration, StoragePath filePath, Option<BaseKeyGenerator> keyGeneratorOpt) { + throw new UnsupportedOperationException("HFileUtils does not support fetchRecordKeysWithPositions"); + } + + @Override + public Schema readAvroSchema(StorageConfiguration<?> configuration, StoragePath filePath) { + LOG.info("Reading schema from {}", filePath); + + try (HoodieFileReader fileReader = + HoodieFileReaderFactory.getReaderFactory(HoodieRecord.HoodieRecordType.AVRO) + .getFileReader( + ConfigUtils.DEFAULT_HUDI_CONFIG_FOR_READER, + configuration, + filePath)) { + return fileReader.getSchema(); + } catch (IOException e) { + throw new HoodieIOException("Failed to read schema from HFile", e); + } + } + + @Override + public List<HoodieColumnRangeMetadata<Comparable>> readColumnStatsFromMetadata(StorageConfiguration<?> storageConf, StoragePath filePath, List<String> columnList) { + throw new UnsupportedOperationException( + "Reading column statistics from metadata is not supported for HFile format yet"); + } + + @Override + public HoodieFileFormat getFormat() { + return HoodieFileFormat.HFILE; + } + + @Override + public void writeMetaFile(HoodieStorage storage, StoragePath filePath, Properties props) throws IOException { + throw new UnsupportedOperationException("HFileUtils does not support writeMetaFile"); + } +} diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java index efb88412a21..69c4d35a847 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/common/table/TestTableSchemaResolver.java @@ -34,7 +34,6 @@ import org.apache.hudi.storage.StoragePath; import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; -import org.apache.parquet.avro.AvroSchemaConverter; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -96,10 +95,8 @@ public class TestTableSchemaResolver { StoragePath partitionPath = new StoragePath(testDir, "partition1"); Schema expectedSchema = getSimpleSchema(); StoragePath logFilePath = writeLogFile(partitionPath, expectedSchema); - assertEquals( - new AvroSchemaConverter().convert(expectedSchema), - TableSchemaResolver.readSchemaFromLogFile(HoodieStorageUtils.getStorage( - logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()), logFilePath)); + assertEquals(expectedSchema, TableSchemaResolver.readSchemaFromLogFile(HoodieStorageUtils.getStorage( + logFilePath, HoodieTestUtils.getDefaultStorageConfWithDefaults()), logFilePath)); } private String initTestDir(String folderName) throws IOException { diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala index 36f4ad4b1bc..05ea6ae4548 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileMetadataProcedure.scala @@ -67,8 +67,7 @@ class ShowHoodieLogFileMetadataProcedure extends BaseProcedure with ProcedureBui logFilePaths.foreach { logFilePath => { val statuses = storage.listDirectEntries(new StoragePath(logFilePath)) - val schema = new AvroSchemaConverter() - .convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePath)))) + val schema = TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePath)) val reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(statuses.get(0).getPath), schema) // read the avro blocks diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala index 970c9352dc5..22d0e423155 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowHoodieLogFileRecordsProcedure.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.hudi.command.procedures +import org.apache.avro.generic.IndexedRecord import org.apache.hudi.common.config.{HoodieCommonConfig, HoodieMemoryConfig, HoodieReaderConfig} import org.apache.hudi.common.fs.FSUtils import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType @@ -26,9 +27,6 @@ import org.apache.hudi.common.table.log.block.HoodieDataBlock import org.apache.hudi.common.table.log.{HoodieLogFormat, HoodieMergedLogRecordScanner} import org.apache.hudi.common.util.{FileIOUtils, ValidationUtils} import org.apache.hudi.storage.StoragePath - -import org.apache.avro.generic.IndexedRecord -import org.apache.parquet.avro.AvroSchemaConverter import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} @@ -60,10 +58,9 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil val logFilePaths = FSUtils.getGlobStatusExcludingMetaFolder(storage, new StoragePath(logFilePathPattern)).iterator().asScala .map(_.getPath.toString).toList ValidationUtils.checkArgument(logFilePaths.nonEmpty, "There is no log file") - val converter = new AvroSchemaConverter() val allRecords: java.util.List[IndexedRecord] = new java.util.ArrayList[IndexedRecord] if (merge) { - val schema = converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePaths.last)))) + val schema = Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePaths.last))) val scanner = HoodieMergedLogRecordScanner.newBuilder .withStorage(storage) .withBasePath(basePath) @@ -86,7 +83,7 @@ class ShowHoodieLogFileRecordsProcedure extends BaseProcedure with ProcedureBuil } else { logFilePaths.toStream.takeWhile(_ => allRecords.size() < limit).foreach { logFilePath => { - val schema = converter.convert(Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePath)))) + val schema = Objects.requireNonNull(TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePath))) val reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(logFilePath), schema) while (reader.hasNext) { val block = reader.next() diff --git a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java index aef8c3aea76..2eb13f5a787 100644 --- a/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java +++ b/hudi-sync/hudi-sync-common/src/main/java/org/apache/hudi/sync/common/HoodieSyncClient.java @@ -21,8 +21,8 @@ package org.apache.hudi.sync.common; import org.apache.hudi.common.engine.HoodieLocalEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.table.ParquetTableSchemaResolver; import org.apache.hudi.common.table.HoodieTableMetaClient; -import org.apache.hudi.common.table.TableSchemaResolver; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.table.timeline.TimelineUtils; import org.apache.hudi.common.util.Option; @@ -99,7 +99,7 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto @Override public MessageType getStorageSchema() { try { - return new TableSchemaResolver(metaClient).getTableParquetSchema(); + return new ParquetTableSchemaResolver(metaClient).getTableParquetSchema(); } catch (Exception e) { throw new HoodieSyncException("Failed to read schema from storage.", e); } @@ -108,7 +108,7 @@ public abstract class HoodieSyncClient implements HoodieMetaSyncOperations, Auto @Override public MessageType getStorageSchema(boolean includeMetadataField) { try { - return new TableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField); + return new ParquetTableSchemaResolver(metaClient).getTableParquetSchema(includeMetadataField); } catch (Exception e) { throw new HoodieSyncException("Failed to read schema from storage.", e); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index a9e2d895c13..3ddb681779f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -73,8 +73,6 @@ import com.beust.jcommander.JCommander; import com.beust.jcommander.Parameter; import org.apache.avro.Schema; import org.apache.hadoop.fs.Path; -import org.apache.parquet.avro.AvroSchemaConverter; -import org.apache.parquet.schema.MessageType; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; @@ -1168,20 +1166,18 @@ public class HoodieMetadataTableValidator implements Serializable { String basePath = metaClient.getBasePathV2().toString(); HoodieTimeline commitsTimeline = metaClient.getCommitsTimeline(); - AvroSchemaConverter converter = new AvroSchemaConverter(); HoodieTimeline completedInstantsTimeline = commitsTimeline.filterCompletedInstants(); HoodieTimeline inflightInstantsTimeline = commitsTimeline.filterInflights(); for (String logFilePathStr : logFilePathSet) { HoodieLogFormat.Reader reader = null; try { - MessageType messageType = + Schema readerSchema = TableSchemaResolver.readSchemaFromLogFile(storage, new StoragePath(logFilePathStr)); - if (messageType == null) { + if (readerSchema == null) { LOG.warn("Cannot read schema from log file {}. Skip the check as it's likely being written by an inflight instant.", logFilePathStr); continue; } - Schema readerSchema = converter.convert(messageType); reader = HoodieLogFormat.newReader(storage, new HoodieLogFile(logFilePathStr), readerSchema, false); // read the avro blocks