This is an automated email from the ASF dual-hosted git repository. forwardxu pushed a commit to branch release-0.12.1 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 895f260983f3f6875c862b1d8e62c04aed09ff17 Author: shaoxiong.zhan <shaoxiong0...@gmail.com> AuthorDate: Thu Sep 22 20:18:37 2022 +0800 opt procedure backup_invalid_parquet (cherry picked from commit 422f1e53) 903daba5 opt procedure backup_invalid_parquet --- .../procedures/BackupInvalidParquetProcedure.scala | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala index fbbb1247fa..5c1234b7a2 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala @@ -21,15 +21,18 @@ import org.apache.hadoop.fs.Path import org.apache.hudi.client.common.HoodieSparkEngineContext import org.apache.hudi.common.config.SerializableConfiguration import org.apache.hudi.common.fs.FSUtils +import org.apache.hudi.common.model.HoodieFileFormat +import org.apache.hudi.common.util.BaseFileUtils import org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS import org.apache.parquet.hadoop.ParquetFileReader import org.apache.spark.api.java.JavaRDD +import org.apache.spark.internal.Logging import org.apache.spark.sql.Row import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, StructType} import java.util.function.Supplier -class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder { +class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder with Logging { private val PARAMETERS = Array[ProcedureParameter]( ProcedureParameter.required(0, "path", DataTypes.StringType, None) ) @@ -62,9 +65,15 @@ class BackupInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder val filePath = status.getPath var isInvalid = false if (filePath.toString.endsWith(".parquet")) { - try ParquetFileReader.readFooter(serHadoopConf.get(), filePath, SKIP_ROW_GROUPS).getFileMetaData catch { + try { + // check footer + ParquetFileReader.readFooter(serHadoopConf.get(), filePath, SKIP_ROW_GROUPS).getFileMetaData + + // check row group + BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(serHadoopConf.get(), filePath) + } catch { case e: Exception => - isInvalid = e.getMessage.contains("is not a Parquet file") + isInvalid = true filePath.getFileSystem(serHadoopConf.get()).rename(filePath, new Path(backupPath, filePath.getName)) } }