This is an automated email from the ASF dual-hosted git repository.

biyan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new ca4003b784 [HUDI-5057] Fix msck repair hudi table (#6999)
ca4003b784 is described below

commit ca4003b78445d5be8b305b6f5ead90a76bfe361b
Author: Zouxxyy <zouxinyu....@alibaba-inc.com>
AuthorDate: Wed Oct 26 10:28:26 2022 +0800

    [HUDI-5057] Fix msck repair hudi table (#6999)
---
 .../spark/sql/HoodieCatalystPlansUtils.scala       |  10 ++
 .../java/org/apache/hudi/common/fs/FSUtils.java    |   5 +-
 .../java/org/apache/hudi/source/FileIndex.java     |   2 +-
 .../spark/sql/hudi/HoodieSqlCommonUtils.scala      |  16 +-
 .../hudi/command/RepairHoodieTableCommand.scala    | 170 +++++++++++++++++++++
 .../spark/sql/hudi/analysis/HoodieAnalysis.scala   |   8 +
 .../apache/spark/sql/hudi/TestRepairTable.scala    | 163 ++++++++++++++++++++
 .../spark/sql/HoodieSpark2CatalystPlanUtils.scala  |  16 +-
 .../spark/sql/HoodieSpark31CatalystPlanUtils.scala |  16 ++
 .../spark/sql/HoodieSpark32CatalystPlanUtils.scala |  16 ++
 .../spark/sql/HoodieSpark33CatalystPlanUtils.scala |  15 ++
 11 files changed, 431 insertions(+), 6 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
index e7e529b125..efd0eacac7 100644
--- 
a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
+++ 
b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieCatalystPlansUtils.scala
@@ -90,4 +90,14 @@ trait HoodieCatalystPlansUtils {
   def createInsertInto(table: LogicalPlan, partition: Map[String, 
Option[String]],
                        query: LogicalPlan, overwrite: Boolean, 
ifPartitionNotExists: Boolean): LogicalPlan
 
+  /**
+   * Test if the logical plan is a Repair Table LogicalPlan.
+   */
+  def isRepairTable(plan: LogicalPlan): Boolean
+
+  /**
+   * Get the member of the Repair Table LogicalPlan.
+   */
+  def getRepairTableChildren(plan: LogicalPlan):
+    Option[(TableIdentifier, Boolean, Boolean, String)]
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
index c3d8e9f8be..eb5fbe9179 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java
@@ -322,10 +322,9 @@ public class FSUtils {
   public static Map<String, FileStatus[]> 
getFilesInPartitions(HoodieEngineContext engineContext,
                                                                
HoodieMetadataConfig metadataConfig,
                                                                String 
basePathStr,
-                                                               String[] 
partitionPaths,
-                                                               String 
spillableMapPath) {
+                                                               String[] 
partitionPaths) {
     try (HoodieTableMetadata tableMetadata = 
HoodieTableMetadata.create(engineContext, metadataConfig, basePathStr,
-        spillableMapPath, true)) {
+        FileSystemViewStorageConfig.SPILLABLE_DIR.defaultValue(), true)) {
       return 
tableMetadata.getAllFilesInPartitions(Arrays.asList(partitionPaths));
     } catch (Exception ex) {
       throw new HoodieException("Error get files in partitions: " + 
String.join(",", partitionPaths), ex);
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
index 92396c1820..a382954fd2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/FileIndex.java
@@ -138,7 +138,7 @@ public class FileIndex {
     }
     String[] partitions = getOrBuildPartitionPaths().stream().map(p -> 
fullPartitionPath(path, p)).toArray(String[]::new);
     FileStatus[] allFileStatus = 
FSUtils.getFilesInPartitions(HoodieFlinkEngineContext.DEFAULT, metadataConfig, 
path.toString(),
-            partitions, "/tmp/")
+            partitions)
         .values().stream().flatMap(Arrays::stream).toArray(FileStatus[]::new);
     Set<String> candidateFiles = candidateFilesInMetadataTable(allFileStatus);
     if (candidateFiles == null) {
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
index 025a224373..aff65672c5 100644
--- 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/HoodieSqlCommonUtils.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hudi
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileStatus, Path}
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.config.{DFSPropertiesConfiguration, 
HoodieMetadataConfig}
 import org.apache.hudi.common.fs.FSUtils
@@ -78,6 +78,20 @@ object HoodieSqlCommonUtils extends SparkAdapterSupport {
     FSUtils.getAllPartitionPaths(sparkEngine, metadataConfig, 
getTableLocation(table, spark)).asScala
   }
 
+  def getFilesInPartitions(spark: SparkSession,
+                           table: CatalogTable,
+                           partitionPaths: Seq[String]): Map[String, 
Array[FileStatus]] = {
+    val sparkEngine = new HoodieSparkEngineContext(new 
JavaSparkContext(spark.sparkContext))
+    val metadataConfig = {
+      val properties = new Properties()
+      properties.putAll((spark.sessionState.conf.getAllConfs ++ 
table.storage.properties ++
+        table.properties).asJava)
+      HoodieMetadataConfig.newBuilder.fromProperties(properties).build()
+    }
+    FSUtils.getFilesInPartitions(sparkEngine, metadataConfig, 
getTableLocation(table, spark),
+      partitionPaths.toArray).asScala.toMap
+  }
+
   /**
    * This method is used to compatible with the old non-hive-styled partition 
table.
    * By default we enable the "hoodie.datasource.write.hive_style_partitioning"
diff --git 
a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala
 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala
new file mode 100644
index 0000000000..cc25318c6a
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/spark/sql/hudi/command/RepairHoodieTableCommand.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.hudi.common.table.HoodieTableConfig
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.execution.command.PartitionStatistics
+import org.apache.spark.sql.hudi.HoodieSqlCommonUtils
+import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
+import org.apache.spark.util.ThreadUtils
+
+import java.util.concurrent.TimeUnit.MILLISECONDS
+import scala.util.control.NonFatal
+
+/**
+ * Command for repair hudi table's partitions.
+ * Use the methods in HoodieSqlCommonUtils to obtain partitions and stats
+ * instead of scanning the file system.
+ */
+case class RepairHoodieTableCommand(tableName: TableIdentifier,
+                                    enableAddPartitions: Boolean,
+                                    enableDropPartitions: Boolean,
+                                    cmd: String = "MSCK REPAIR TABLE") extends 
HoodieLeafRunnableCommand {
+
+  // These are list of statistics that can be collected quickly without 
requiring a scan of the data
+  // see https://github.com/apache/hive/blob/master/
+  //   common/src/java/org/apache/hadoop/hive/common/StatsSetupConst.java
+  val NUM_FILES = "numFiles"
+  val TOTAL_SIZE = "totalSize"
+  val DDL_TIME = "transient_lastDdlTime"
+
+  override def run(spark: SparkSession): Seq[Row] = {
+    val catalog = spark.sessionState.catalog
+    val table = catalog.getTableMetadata(tableName)
+    val tableIdentWithDB = table.identifier.quotedString
+    if (table.partitionColumnNames.isEmpty) {
+      throw new AnalysisException(
+        s"Operation not allowed: $cmd only works on partitioned tables: 
$tableIdentWithDB")
+    }
+
+    if (table.storage.locationUri.isEmpty) {
+      throw new AnalysisException(s"Operation not allowed: $cmd only works on 
table with " +
+        s"location provided: $tableIdentWithDB")
+    }
+
+    val root = new Path(table.location)
+    logInfo(s"Recover all the partitions in $root")
+
+    val hoodieCatalogTable = HoodieCatalogTable(spark, table.identifier)
+    val isHiveStyledPartitioning = hoodieCatalogTable.catalogProperties.
+      getOrElse(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE.key, 
"true").toBoolean
+
+    val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = 
hoodieCatalogTable.
+      getPartitionPaths.map(partitionPath => {
+      var values = partitionPath.split('/')
+        if (isHiveStyledPartitioning) {
+        values = values.map(_.split('=')(1))
+        }
+        (table.partitionColumnNames.zip(values).toMap, new Path(root, 
partitionPath))
+      })
+
+    val droppedAmount = if (enableDropPartitions) {
+      dropPartitions(catalog, partitionSpecsAndLocs)
+    } else 0
+    val addedAmount = if (enableAddPartitions) {
+      val total = partitionSpecsAndLocs.length
+      val partitionStats = if (spark.sqlContext.conf.gatherFastStats) {
+        HoodieSqlCommonUtils.getFilesInPartitions(spark, table, 
partitionSpecsAndLocs
+          .map(_._2.toString))
+          .mapValues(statuses => PartitionStatistics(statuses.length, 
statuses.map(_.getLen).sum))
+      } else {
+        Map.empty[String, PartitionStatistics]
+      }
+      logInfo(s"Finished to gather the fast stats for all $total partitions.")
+      addPartitions(spark, table, partitionSpecsAndLocs, partitionStats)
+      total
+    } else 0
+    // Updates the table to indicate that its partition metadata is stored in 
the Hive metastore.
+    // This is always the case for Hive format tables, but is not true for 
Datasource tables created
+    // before Spark 2.1 unless they are converted via `msck repair table`.
+    spark.sessionState.catalog.alterTable(table.copy(tracksPartitionsInCatalog 
= true))
+    try {
+      spark.catalog.refreshTable(tableIdentWithDB)
+    } catch {
+      case NonFatal(e) =>
+        logError(s"Cannot refresh the table '$tableIdentWithDB'. A query of 
the table " +
+          "might return wrong result if the table was cached. To avoid such 
issue, you should " +
+          "uncache the table manually via the UNCACHE TABLE command after 
table recovering will " +
+          "complete fully.", e)
+    }
+    logInfo(s"Recovered all partitions: added ($addedAmount), dropped 
($droppedAmount).")
+    Seq.empty[Row]
+  }
+
+  private def addPartitions(spark: SparkSession,
+                            table: CatalogTable,
+                            partitionSpecsAndLocs: Seq[(TablePartitionSpec, 
Path)],
+                            partitionStats: Map[String, PartitionStatistics]): 
Unit = {
+    val total = partitionSpecsAndLocs.length
+    var done = 0L
+    // Hive metastore may not have enough memory to handle millions of 
partitions in single RPC,
+    // we should split them into smaller batches. Since Hive client is not 
thread safe, we cannot
+    // do this in parallel.
+    val batchSize = 
spark.sparkContext.conf.getInt("spark.sql.addPartitionInBatch.size", 100)
+    partitionSpecsAndLocs.iterator.grouped(batchSize).foreach { batch =>
+      val now = MILLISECONDS.toSeconds(System.currentTimeMillis())
+      val parts = batch.map { case (spec, location) =>
+        val params = partitionStats.get(location.toString).map {
+          case PartitionStatistics(numFiles, totalSize) =>
+            // This two fast stat could prevent Hive metastore to list the 
files again.
+            Map(NUM_FILES -> numFiles.toString,
+              TOTAL_SIZE -> totalSize.toString,
+              // Workaround a bug in HiveMetastore that try to mutate a 
read-only parameters.
+              // see 
metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
+              DDL_TIME -> now.toString)
+        }.getOrElse(Map.empty)
+        // inherit table storage format (possibly except for location)
+        CatalogTablePartition(
+          spec,
+          table.storage.copy(locationUri = Some(location.toUri)),
+          params)
+      }
+      spark.sessionState.catalog.createPartitions(tableName, parts, 
ignoreIfExists = true)
+      done += parts.length
+      logDebug(s"Recovered ${parts.length} partitions ($done/$total so far)")
+    }
+  }
+
+  // Drops the partitions that do not exist in partitionSpecsAndLocs
+  private def dropPartitions(catalog: SessionCatalog,
+                             partitionSpecsAndLocs: Seq[(TablePartitionSpec, 
Path)]): Int = {
+    val dropPartSpecs = ThreadUtils.parmap(
+      catalog.listPartitions(tableName),
+      "RepairTableCommand: non-existing partitions",
+      maxThreads = 8) { partition =>
+      partition.storage.locationUri.flatMap { uri =>
+        if (partitionSpecsAndLocs.map(_._2).contains(new Path(uri))) None else 
Some(partition.spec)
+      }
+    }.flatten
+    catalog.dropPartitions(
+      tableName,
+      dropPartSpecs,
+      ignoreIfNotExists = true,
+      purge = false,
+      // Since we have already checked that partition directories do not 
exist, we can avoid
+      // additional calls to the file system at the catalog side by setting 
this flag.
+      retainData = true)
+    dropPartSpecs.length
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
index c5688965d7..5962ac867c 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala
@@ -610,6 +610,14 @@ case class HoodiePostAnalysisRule(sparkSession: 
SparkSession) extends Rule[Logic
       case TruncateTableCommand(tableName, partitionSpec)
         if sparkAdapter.isHoodieTable(tableName, sparkSession) =>
         TruncateHoodieTableCommand(tableName, partitionSpec)
+      // Rewrite RepairTableCommand to RepairHoodieTableCommand
+      case r if sparkAdapter.getCatalystPlanUtils.isRepairTable(r) =>
+        val (tableName, enableAddPartitions, enableDropPartitions, cmd) = 
sparkAdapter.getCatalystPlanUtils.getRepairTableChildren(r).get
+        if (sparkAdapter.isHoodieTable(tableName, sparkSession)) {
+          RepairHoodieTableCommand(tableName, enableAddPartitions, 
enableDropPartitions, cmd)
+        } else {
+          r
+        }
       case _ => plan
     }
   }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
new file mode 100644
index 0000000000..498121e1ab
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestRepairTable.scala
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi
+
+import org.apache.hudi.DataSourceWriteOptions.{PARTITIONPATH_FIELD, 
PRECOMBINE_FIELD, RECORDKEY_FIELD}
+import org.apache.hudi.HoodieSparkUtils
+import 
org.apache.hudi.common.table.HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE
+import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
+
+import org.apache.spark.sql.SaveMode
+
+class TestRepairTable extends HoodieSparkSqlTestBase {
+
+  test("Test msck repair non-partitioned table") {
+    Seq("true", "false").foreach { hiveStylePartitionEnable =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id int,
+             |  name string,
+             |  ts long,
+             |  dt string,
+             |  hh string
+             | ) using hudi
+             | location '$basePath'
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.datasource.write.hive_style_partitioning = 
'$hiveStylePartitionEnable'
+             | )
+        """.stripMargin)
+
+        checkExceptionContain(s"msck repair table $tableName")(
+          s"Operation not allowed")
+      }
+    }
+  }
+
+  test("Test msck repair partitioned table") {
+    Seq("true", "false").foreach { hiveStylePartitionEnable =>
+      withTempDir { tmp =>
+        val tableName = generateTableName
+        val basePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             | create table $tableName (
+             |  id int,
+             |  name string,
+             |  ts long,
+             |  dt string,
+             |  hh string
+             | ) using hudi
+             | partitioned by (dt, hh)
+             | location '$basePath'
+             | tblproperties (
+             |  primaryKey = 'id',
+             |  preCombineField = 'ts',
+             |  hoodie.datasource.write.hive_style_partitioning = 
'$hiveStylePartitionEnable'
+             | )
+        """.stripMargin)
+        val table = 
spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+
+        import spark.implicits._
+        val df = Seq((1, "a1", 1000, "2022-10-06", "11"), (2, "a2", 1001, 
"2022-10-06", "12"))
+          .toDF("id", "name", "ts", "dt", "hh")
+        df.write.format("hudi")
+          .option(RECORDKEY_FIELD.key, "id")
+          .option(PRECOMBINE_FIELD.key, "ts")
+          .option(PARTITIONPATH_FIELD.key, "dt, hh")
+          .option(HIVE_STYLE_PARTITIONING_ENABLE.key, hiveStylePartitionEnable)
+          .mode(SaveMode.Append)
+          .save(basePath)
+
+        
assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table))
+        spark.sql(s"msck repair table $tableName")
+        assertResult(Seq("dt=2022-10-06/hh=11", "dt=2022-10-06/hh=12"))(
+          spark.sessionState.catalog.listPartitionNames(table))
+      }
+    }
+  }
+
+  test("Test msck repair partitioned table [add/drop/sync] partitions") {
+    if (HoodieSparkUtils.gteqSpark3_2) {
+      Seq("true", "false").foreach { hiveStylePartitionEnable =>
+        withTempDir { tmp =>
+          val tableName = generateTableName
+          val basePath = s"${tmp.getCanonicalPath}/$tableName"
+          spark.sql(
+            s"""
+               | create table $tableName (
+               |  id int,
+               |  name string,
+               |  ts long,
+               |  dt string
+               | ) using hudi
+               | partitioned by (dt)
+               | location '$basePath'
+               | tblproperties (
+               |  primaryKey = 'id',
+               |  preCombineField = 'ts',
+               |  hoodie.datasource.write.hive_style_partitioning = 
'$hiveStylePartitionEnable'
+               | )
+        """.stripMargin)
+          val table = 
spark.sessionState.sqlParser.parseTableIdentifier(tableName)
+
+          // test msck repair table add partitions
+          import spark.implicits._
+          val df1 = Seq((1, "a1", 1000, "2022-10-06")).toDF("id", "name", 
"ts", "dt")
+          df1.write.format("hudi")
+            .option(TBL_NAME.key(), tableName)
+            .option(RECORDKEY_FIELD.key, "id")
+            .option(PRECOMBINE_FIELD.key, "ts")
+            .option(PARTITIONPATH_FIELD.key, "dt")
+            .option(HIVE_STYLE_PARTITIONING_ENABLE.key, 
hiveStylePartitionEnable)
+            .mode(SaveMode.Append)
+            .save(basePath)
+
+          
assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table))
+          spark.sql(s"msck repair table $tableName add partitions")
+          
assertResult(Seq("dt=2022-10-06"))(spark.sessionState.catalog.listPartitionNames(table))
+
+          // test msck repair table drop partitions
+          val df2 = Seq((2, "a2", 1001, "2022-10-07")).toDF("id", "name", 
"ts", "dt")
+          df2.write.format("hudi")
+            .option(TBL_NAME.key(), tableName)
+            .option(RECORDKEY_FIELD.key, "id")
+            .option(PRECOMBINE_FIELD.key, "ts")
+            .option(PARTITIONPATH_FIELD.key, "dt")
+            .option(HIVE_STYLE_PARTITIONING_ENABLE.key, 
hiveStylePartitionEnable)
+            .mode(SaveMode.Overwrite)
+            .save(basePath)
+
+          
assertResult(Seq("dt=2022-10-06"))(spark.sessionState.catalog.listPartitionNames(table))
+          spark.sql(s"msck repair table $tableName drop partitions")
+          
assertResult(Seq())(spark.sessionState.catalog.listPartitionNames(table))
+
+          // test msck repair table sync partitions
+          spark.sql(s"msck repair table $tableName sync partitions")
+          
assertResult(Seq("dt=2022-10-07"))(spark.sessionState.catalog.listPartitionNames(table))
+        }
+      }
+    }
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
index 4156198153..2672e2c4cb 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark2/src/main/scala/org/apache/spark/sql/HoodieSpark2CatalystPlanUtils.scala
@@ -22,7 +22,7 @@ import 
org.apache.spark.sql.catalyst.analysis.{SimpleAnalyzer, UnresolvedRelatio
 import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Like}
 import org.apache.spark.sql.catalyst.plans.JoinType
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Join, 
LogicalPlan}
-import org.apache.spark.sql.execution.command.ExplainCommand
+import 
org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, 
ExplainCommand}
 import org.apache.spark.sql.internal.SQLConf
 
 object HoodieSpark2CatalystPlanUtils extends HoodieCatalystPlansUtils {
@@ -74,4 +74,18 @@ object HoodieSpark2CatalystPlanUtils extends 
HoodieCatalystPlansUtils {
   override def getRelationTimeTravel(plan: LogicalPlan): Option[(LogicalPlan, 
Option[Expression], Option[String])] = {
     throw new IllegalStateException(s"Should not call getRelationTimeTravel 
for spark2")
   }
+
+  override def isRepairTable(plan: LogicalPlan): Boolean = {
+    plan.isInstanceOf[AlterTableRecoverPartitionsCommand]
+  }
+
+  override def getRepairTableChildren(plan: LogicalPlan): 
Option[(TableIdentifier, Boolean, Boolean, String)] = {
+    plan match {
+      // For Spark >= 3.2.x, AlterTableRecoverPartitionsCommand was renamed 
RepairTableCommand, and added two new
+      // parameters: enableAddPartitions and enableDropPartitions. By setting 
them to true and false, can restore
+      // AlterTableRecoverPartitionsCommand's behavior
+      case c: AlterTableRecoverPartitionsCommand =>
+        Some((c.tableName, true, false, c.cmd))
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
index 57864004df..a4016f18cc 100644
--- 
a/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.1.x/src/main/scala/org/apache/spark/sql/HoodieSpark31CatalystPlanUtils.scala
@@ -18,8 +18,10 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark31CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -31,4 +33,18 @@ object HoodieSpark31CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
   }
 
   override def projectOverSchema(schema: StructType, output: AttributeSet): 
ProjectionOverSchema = ProjectionOverSchema(schema)
+
+  override def isRepairTable(plan: LogicalPlan): Boolean = {
+    plan.isInstanceOf[AlterTableRecoverPartitionsCommand]
+  }
+
+  override def getRepairTableChildren(plan: LogicalPlan): 
Option[(TableIdentifier, Boolean, Boolean, String)] = {
+    plan match {
+      // For Spark >= 3.2.x, AlterTableRecoverPartitionsCommand was renamed 
RepairTableCommand, and added two new
+      // parameters: enableAddPartitions and enableDropPartitions. By setting 
them to true and false, can restore
+      // AlterTableRecoverPartitionsCommand's behavior
+      case c: AlterTableRecoverPartitionsCommand =>
+        Some((c.tableName, true, false, c.cmd))
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
index 19025ce0d5..0548fd47a4 100644
--- 
a/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.2.x/src/main/scala/org/apache/spark/sql/HoodieSpark32CatalystPlanUtils.scala
@@ -20,8 +20,11 @@ package org.apache.spark.sql
 
 import org.apache.hudi.HoodieSparkUtils
 import org.apache.hudi.common.util.ValidationUtils.checkArgument
+
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
TimeTravelRelation}
+import org.apache.spark.sql.execution.command.RepairTableCommand
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark32CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -54,4 +57,17 @@ object HoodieSpark32CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
 
     p.asInstanceOf[ProjectionOverSchema]
   }
+
+  override def isRepairTable(plan: LogicalPlan): Boolean = {
+    plan.isInstanceOf[RepairTableCommand]
+  }
+
+  override def getRepairTableChildren(plan: LogicalPlan): 
Option[(TableIdentifier, Boolean, Boolean, String)] = {
+    plan match {
+      case rtc: RepairTableCommand =>
+        Some((rtc.tableName, rtc.enableAddPartitions, 
rtc.enableDropPartitions, rtc.cmd))
+      case _ =>
+        None
+    }
+  }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
index 4d4921bc03..c364254488 100644
--- 
a/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
+++ 
b/hudi-spark-datasource/hudi-spark3.3.x/src/main/scala/org/apache/spark/sql/HoodieSpark33CatalystPlanUtils.scala
@@ -18,8 +18,10 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, 
ProjectionOverSchema}
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
TimeTravelRelation}
+import org.apache.spark.sql.execution.command.RepairTableCommand
 import org.apache.spark.sql.types.StructType
 
 object HoodieSpark33CatalystPlanUtils extends HoodieSpark3CatalystPlanUtils {
@@ -39,4 +41,17 @@ object HoodieSpark33CatalystPlanUtils extends 
HoodieSpark3CatalystPlanUtils {
 
   override def projectOverSchema(schema: StructType, output: AttributeSet): 
ProjectionOverSchema =
     ProjectionOverSchema(schema, output)
+
+  override def isRepairTable(plan: LogicalPlan): Boolean = {
+    plan.isInstanceOf[RepairTableCommand]
+  }
+
+  override def getRepairTableChildren(plan: LogicalPlan): 
Option[(TableIdentifier, Boolean, Boolean, String)] = {
+    plan match {
+      case rtc: RepairTableCommand =>
+        Some((rtc.tableName, rtc.enableAddPartitions, 
rtc.enableDropPartitions, rtc.cmd))
+      case _ =>
+        None
+    }
+  }
 }

Reply via email to