Repository: spark
Updated Branches:
  refs/heads/master b2dd8ec6b -> 24c0c9412


[SPARK-18949][SQL] Add recoverPartitions API to Catalog

### What changes were proposed in this pull request?

Currently, we only have a SQL interface for recovering all the partitions in 
the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER 
TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` 
and have no clue what it means)

After the new "Scalable Partition Handling", the table repair becomes much more 
important for making visible the data in the created data source partitioned 
table.

Thus, this PR is to add it into the Catalog interface. After this PR, users can 
repair the table by
```Scala
spark.catalog.recoverPartitions("testTable")
```

### How was this patch tested?
Modified the existing test cases.

Author: gatorsmile <gatorsm...@gmail.com>

Closes #16356 from gatorsmile/repairTable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/24c0c941
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/24c0c941
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/24c0c941

Branch: refs/heads/master
Commit: 24c0c94128770be9034fb69518713d7f6aa1e041
Parents: b2dd8ec
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Tue Dec 20 23:40:02 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Tue Dec 20 23:40:02 2016 -0800

----------------------------------------------------------------------
 project/MimaExcludes.scala                            |  4 +++-
 python/pyspark/sql/catalog.py                         |  5 +++++
 .../scala/org/apache/spark/sql/catalog/Catalog.scala  |  7 +++++++
 .../org/apache/spark/sql/internal/CatalogImpl.scala   | 14 ++++++++++++++
 .../hive/PartitionProviderCompatibilitySuite.scala    |  6 +++---
 5 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/24c0c941/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b215d88..20f5c27 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -37,7 +37,9 @@ object MimaExcludes {
   // Exclude rules for 2.2.x
   lazy val v22excludes = v21excludes ++ Seq(
     // [SPARK-18663][SQL] Simplify CountMinSketch aggregate implementation
-    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray")
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.util.sketch.CountMinSketch.toByteArray"),
+    // [SPARK-18949] [SQL] Add repairTable API to Catalog
+    
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions")
   )
 
   // Exclude rules for 2.1.x

http://git-wip-us.apache.org/repos/asf/spark/blob/24c0c941/python/pyspark/sql/catalog.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index a36d02e..30c7a3f 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -258,6 +258,11 @@ class Catalog(object):
         """Invalidate and refresh all the cached metadata of the given 
table."""
         self._jcatalog.refreshTable(tableName)
 
+    @since('2.1.1')
+    def recoverPartitions(self, tableName):
+        """Recover all the partitions of the given table and update the 
catalog."""
+        self._jcatalog.recoverPartitions(tableName)
+
     def _reset(self):
         """(Internal use only) Drop all existing databases (except "default"), 
tables,
         partitions and functions, and set the current database to "default".

http://git-wip-us.apache.org/repos/asf/spark/blob/24c0c941/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index aecdda1..6b061f8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -301,6 +301,13 @@ abstract class Catalog {
   def dropGlobalTempView(viewName: String): Boolean
 
   /**
+   * Recover all the partitions in the directory of a table and update the 
catalog.
+   *
+   * @since 2.1.1
+   */
+  def recoverPartitions(tableName: String): Unit
+
+  /**
    * Returns true if the table is currently cached in-memory.
    *
    * @since 2.0.0

http://git-wip-us.apache.org/repos/asf/spark/blob/24c0c941/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 6d98462..41ed9d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -27,6 +27,7 @@ import 
org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import 
org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
 import org.apache.spark.sql.types.StructType
 
@@ -394,6 +395,19 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   }
 
   /**
+   * Recover all the partitions in the directory of a table and update the 
catalog.
+   *
+   * @param tableName the name of the table to be repaired.
+   * @group ddl_ops
+   * @since 2.1.1
+   */
+  override def recoverPartitions(tableName: String): Unit = {
+    val tableIdent = 
sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+    sparkSession.sessionState.executePlan(
+      AlterTableRecoverPartitionsCommand(tableIdent)).toRdd
+  }
+
+  /**
    * Returns true if the table is currently cached in-memory.
    *
    * @group cachemgmt

http://git-wip-us.apache.org/repos/asf/spark/blob/24c0c941/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index c2ac032..3f84cbd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -70,7 +70,7 @@ class PartitionProviderCompatibilitySuite
         }
         withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
           verifyIsLegacyTable("test")
-          spark.sql("msck repair table test")
+          spark.catalog.recoverPartitions("test")
           spark.sql("show partitions test").count()  // check we are a new 
table
 
           // sanity check table performance
@@ -90,7 +90,7 @@ class PartitionProviderCompatibilitySuite
           setupPartitionedDatasourceTable("test", dir)
           spark.sql("show partitions test").count()  // check we are a new 
table
           assert(spark.sql("select * from test").count() == 0)  // needs repair
-          spark.sql("msck repair table test")
+          spark.catalog.recoverPartitions("test")
           assert(spark.sql("select * from test").count() == 5)
         }
       }
@@ -160,7 +160,7 @@ class PartitionProviderCompatibilitySuite
       withTable("test") {
         withTempDir { dir =>
           setupPartitionedDatasourceTable("test", dir)
-          sql("msck repair table test")
+          spark.catalog.recoverPartitions("test")
           spark.sql(
             """insert overwrite table test
               |partition (partCol=1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to