This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 895f260983f3f6875c862b1d8e62c04aed09ff17
Author: shaoxiong.zhan <shaoxiong0...@gmail.com>
AuthorDate: Thu Sep 22 20:18:37 2022 +0800

    opt procedure backup_invalid_parquet
    
    
    (cherry picked from commit 422f1e53)
    
    903daba5 opt procedure backup_invalid_parquet
---
 .../procedures/BackupInvalidParquetProcedure.scala        | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
index fbbb1247fa..5c1234b7a2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/BackupInvalidParquetProcedure.scala
@@ -21,15 +21,18 @@ import org.apache.hadoop.fs.Path
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.SerializableConfiguration
 import org.apache.hudi.common.fs.FSUtils
+import org.apache.hudi.common.model.HoodieFileFormat
+import org.apache.hudi.common.util.BaseFileUtils
 import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
 import org.apache.parquet.hadoop.ParquetFileReader
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
 
 import java.util.function.Supplier
 
-class BackupInvalidParquetProcedure extends BaseProcedure with 
ProcedureBuilder {
+class BackupInvalidParquetProcedure extends BaseProcedure with 
ProcedureBuilder with Logging {
   private val PARAMETERS = Array[ProcedureParameter](
     ProcedureParameter.required(0, "path", DataTypes.StringType, None)
   )
@@ -62,9 +65,15 @@ class BackupInvalidParquetProcedure extends BaseProcedure 
with ProcedureBuilder
         val filePath = status.getPath
         var isInvalid = false
         if (filePath.toString.endsWith(".parquet")) {
-          try ParquetFileReader.readFooter(serHadoopConf.get(), filePath, 
SKIP_ROW_GROUPS).getFileMetaData catch {
+          try {
+            // check footer
+            ParquetFileReader.readFooter(serHadoopConf.get(), filePath, 
SKIP_ROW_GROUPS).getFileMetaData
+
+            // check row group
+            
BaseFileUtils.getInstance(HoodieFileFormat.PARQUET).readAvroRecords(serHadoopConf.get(),
 filePath)
+          } catch {
             case e: Exception =>
-              isInvalid = e.getMessage.contains("is not a Parquet file")
+              isInvalid = true
               filePath.getFileSystem(serHadoopConf.get()).rename(filePath, new 
Path(backupPath, filePath.getName))
           }
         }

Reply via email to