This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e162bb73a HUDI-4687 add show_invalid_parquet procedure (#6480)
1e162bb73a is described below

commit 1e162bb73af9f39024e6e5d098958b9c0d926e6a
Author: shaoxiong.zhan <31836510+microbe...@users.noreply.github.com>
AuthorDate: Wed Aug 24 19:28:26 2022 +0800

    HUDI-4687 add show_invalid_parquet procedure (#6480)
    
    Co-authored-by: zhanshaoxiong <shaoxiong0001@@gmail.com>
---
 .../hudi/command/procedures/HoodieProcedures.scala |  1 +
 .../procedures/ShowInvalidParquetProcedure.scala   | 83 ++++++++++++++++++++++
 .../TestShowInvalidParquetProcedure.scala          | 71 ++++++++++++++++++
 3 files changed, 155 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index b245b54f61..49c88e5cd6 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -82,6 +82,7 @@ object HoodieProcedures {
     mapBuilder.put(RepairOverwriteHoodiePropsProcedure.NAME, 
RepairOverwriteHoodiePropsProcedure.builder)
     mapBuilder.put(RunCleanProcedure.NAME, RunCleanProcedure.builder)
     mapBuilder.put(ValidateHoodieSyncProcedure.NAME, 
ValidateHoodieSyncProcedure.builder)
+    mapBuilder.put(ShowInvalidParquetProcedure.NAME, 
ShowInvalidParquetProcedure.builder)
     mapBuilder.build
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
new file mode 100644
index 0000000000..11d170bbed
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowInvalidParquetProcedure.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.config.SerializableConfiguration
+import org.apache.hudi.common.fs.FSUtils
+import 
org.apache.parquet.format.converter.ParquetMetadataConverter.SKIP_ROW_GROUPS
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+
+class ShowInvalidParquetProcedure extends BaseProcedure with ProcedureBuilder {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "path", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("path", DataTypes.StringType, nullable = true, Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val srcPath = getArgValueOrDefault(args, 
PARAMETERS(0)).get.asInstanceOf[String]
+    val partitionPaths: java.util.List[String] = 
FSUtils.getAllPartitionPaths(new HoodieSparkEngineContext(jsc), srcPath, false, 
false)
+    val javaRdd: JavaRDD[String] = jsc.parallelize(partitionPaths, 
partitionPaths.size())
+    val serHadoopConf = new 
SerializableConfiguration(jsc.hadoopConfiguration())
+    javaRdd.rdd.map(part => {
+      val fs = FSUtils.getFs(new Path(srcPath), serHadoopConf.get())
+      FSUtils.getAllDataFilesInPartition(fs, FSUtils.getPartitionPath(srcPath, 
part))
+    }).flatMap(_.toList)
+      .filter(status => {
+        val filePath = status.getPath
+        var isInvalid = false
+        if (filePath.toString.endsWith(".parquet")) {
+          try ParquetFileReader.readFooter(serHadoopConf.get(), filePath, 
SKIP_ROW_GROUPS).getFileMetaData catch {
+            case e: Exception =>
+              isInvalid = e.getMessage.contains("is not a Parquet file")
+          }
+        }
+        isInvalid
+      })
+      .map(status => Row(status.getPath.toString))
+      .collect()
+  }
+
+  override def build = new ShowInvalidParquetProcedure()
+}
+
+object ShowInvalidParquetProcedure {
+  val NAME = "show_invalid_parquet"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get(): ProcedureBuilder = new ShowInvalidParquetProcedure()
+  }
+}
+
+
+
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
new file mode 100644
index 0000000000..4d0c9c7b34
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestShowInvalidParquetProcedure.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.fs.FSUtils
+
+class TestShowInvalidParquetProcedure extends HoodieSparkProcedureTestBase {
+  test("Test Call show_invalid_parquet Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val basePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | partitioned by (ts)
+           | location '$basePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // insert data to table
+      spark.sql(s"insert into $tableName select 1, 'a1', 10, 1000")
+      spark.sql(s"insert into $tableName select 2, 'a2', 20, 1500")
+
+      // Check required fields
+      checkExceptionContain(s"""call show_invalid_parquet(limit => 10)""")(
+        s"Argument: path is required")
+
+      val fs = FSUtils.getFs(basePath, spark.sparkContext.hadoopConfiguration)
+      val invalidPath1 = new Path(basePath, "ts=1000/1.parquet")
+      val out1 = fs.create(invalidPath1)
+      out1.write(1)
+      out1.close()
+
+      val invalidPath2 = new Path(basePath, "ts=1500/2.parquet")
+      val out2 = fs.create(invalidPath2)
+      out2.write(1)
+      out2.close()
+
+      // collect result for table
+      val result = spark.sql(
+        s"""call show_invalid_parquet(path => 
'$basePath')""".stripMargin).collect()
+      assertResult(2) {
+        result.length
+      }
+    }
+  }
+}

Reply via email to