This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 0d71705ec2276f6524ab97e765563f6c902f35d9 Author: XuQianJin-Stars <forwar...@apache.com> AuthorDate: Fri Dec 9 12:39:39 2022 +0800 [MINOR] add integrity check of parquet file for HoodieRowDataParquetWriter. --- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 22 ++++++---------------- .../src/main/java/org/apache/hudi/io/IOUtils.java | 14 ++++++++++++++ .../io/storage/row/HoodieRowDataParquetWriter.java | 2 ++ 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 88db25bac4..c569acdda6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -18,6 +18,10 @@ package org.apache.hudi.io; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; import org.apache.hudi.client.WriteStatus; import org.apache.hudi.common.engine.TaskContextSupplier; import org.apache.hudi.common.fs.FSUtils; @@ -34,7 +38,6 @@ import org.apache.hudi.common.model.IOType; import org.apache.hudi.common.util.DefaultSizeEstimator; import org.apache.hudi.common.util.HoodieRecordSizeEstimator; import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.ParquetUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.common.util.collection.ExternalSpillableMap; import org.apache.hudi.config.HoodieWriteConfig; @@ -47,27 +50,19 @@ import org.apache.hudi.io.storage.HoodieFileWriter; import org.apache.hudi.keygen.BaseKeyGenerator; import org.apache.hudi.keygen.KeyGenUtils; import org.apache.hudi.table.HoodieTable; - -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.generic.IndexedRecord; -import org.apache.hadoop.fs.Path; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; import javax.annotation.concurrent.NotThreadSafe; - import java.io.IOException; import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.List; -import java.util.NoSuchElementException; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Set; -import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; - @SuppressWarnings("Duplicates") /** * Handle to merge incoming records to those in storage. @@ -450,12 +445,7 @@ public class HoodieMergeHandle<T extends HoodieRecordPayload, I, K, O> extends H return; } - // Fast verify the integrity of the parquet file. - // only check the readable of parquet metadata. - final String extension = FSUtils.getFileExtension(newFilePath.toString()); - if (PARQUET.getFileExtension().equals(extension)) { - new ParquetUtils().readMetadata(hoodieTable.getHadoopConf(), newFilePath); - } + IOUtils.checkParquetFileVaid(hoodieTable.getHadoopConf(), newFilePath); long oldNumWrites = 0; try { diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java index 7636384c3a..b231136ece 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/IOUtils.java @@ -18,11 +18,16 @@ package org.apache.hudi.io; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.engine.EngineProperty; import org.apache.hudi.common.engine.TaskContextSupplier; +import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ParquetUtils; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MAX_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; import static org.apache.hudi.config.HoodieMemoryConfig.DEFAULT_MIN_MEMORY_FOR_SPILLABLE_MAP_IN_BYTES; import static org.apache.hudi.config.HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION; @@ -72,4 +77,13 @@ public class IOUtils { String fraction = hoodieConfig.getStringOrDefault(MAX_MEMORY_FRACTION_FOR_COMPACTION); return getMaxMemoryAllowedForMerge(context, fraction); } + + public static void checkParquetFileVaid(Configuration hadoopConf, Path filePath) { + // Fast verify the integrity of the parquet file. + // only check the readable of parquet metadata. + final String extension = FSUtils.getFileExtension(filePath.toString()); + if (PARQUET.getFileExtension().equals(extension)) { + new ParquetUtils().readMetadata(hadoopConf, filePath); + } + } } diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java index 17b3b6b37c..fd1edaab84 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataParquetWriter.java @@ -20,6 +20,7 @@ package org.apache.hudi.io.storage.row; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.fs.HoodieWrapperFileSystem; +import org.apache.hudi.io.IOUtils; import org.apache.hudi.io.storage.HoodieParquetConfig; import org.apache.flink.table.data.RowData; @@ -74,5 +75,6 @@ public class HoodieRowDataParquetWriter extends ParquetWriter<RowData> @Override public void close() throws IOException { super.close(); + IOUtils.checkParquetFileVaid(fs.getConf(), file); } }