Repository: spark Updated Branches: refs/heads/branch-2.1 318483421 -> 0e51bb085
[SPARK-18949][SQL][BACKPORT-2.1] Add recoverPartitions API to Catalog ### What changes were proposed in this pull request? This PR is to backport https://github.com/apache/spark/pull/16356 to Spark 2.1.1 branch. ---- Currently, we only have a SQL interface for recovering all the partitions in the directory of a table and update the catalog. `MSCK REPAIR TABLE` or `ALTER TABLE table RECOVER PARTITIONS`. (Actually, very hard for me to remember `MSCK` and have no clue what it means) After the new "Scalable Partition Handling", the table repair becomes much more important for making visible the data in the created data source partitioned table. Thus, this PR is to add it into the Catalog interface. After this PR, users can repair the table by ```Scala spark.catalog.recoverPartitions("testTable") ``` ### How was this patch tested? Modified the existing test cases. Author: gatorsmile <gatorsm...@gmail.com> Closes #16372 from gatorsmile/repairTable2.1.1. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0e51bb08 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0e51bb08 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0e51bb08 Branch: refs/heads/branch-2.1 Commit: 0e51bb085446a482c22eaef93aea513610f41f48 Parents: 3184834 Author: gatorsmile <gatorsm...@gmail.com> Authored: Wed Dec 21 13:55:40 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Wed Dec 21 13:55:40 2016 -0800 ---------------------------------------------------------------------- project/MimaExcludes.scala | 5 ++++- python/pyspark/sql/catalog.py | 5 +++++ .../scala/org/apache/spark/sql/catalog/Catalog.scala | 7 +++++++ .../org/apache/spark/sql/internal/CatalogImpl.scala | 14 ++++++++++++++ .../hive/PartitionProviderCompatibilitySuite.scala | 6 +++--- 5 files changed, 33 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 978a328..6d1b4d2 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -110,7 +110,10 @@ object MimaExcludes { ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"), ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"), - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query"), + + // [SPARK-18949] [SQL] Add repairTable API to Catalog + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.recoverPartitions") ) } http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/python/pyspark/sql/catalog.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py index a36d02e..30c7a3f 100644 --- a/python/pyspark/sql/catalog.py +++ b/python/pyspark/sql/catalog.py @@ -258,6 +258,11 @@ class Catalog(object): """Invalidate and refresh all the cached metadata of the given table.""" self._jcatalog.refreshTable(tableName) + @since('2.1.1') + def recoverPartitions(self, tableName): + """Recover all the partitions of the given table and update the catalog.""" + self._jcatalog.recoverPartitions(tableName) + def _reset(self): """(Internal use only) Drop all existing databases (except "default"), tables, partitions and functions, and set the current database to "default". http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala index aecdda1..6b061f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala @@ -301,6 +301,13 @@ abstract class Catalog { def dropGlobalTempView(viewName: String): Boolean /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @since 2.1.1 + */ + def recoverPartitions(tableName: String): Unit + + /** * Returns true if the table is currently cached in-memory. * * @since 2.0.0 http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 6d98462..41ed9d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdenti import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource} import org.apache.spark.sql.types.StructType @@ -394,6 +395,19 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } /** + * Recover all the partitions in the directory of a table and update the catalog. + * + * @param tableName the name of the table to be repaired. + * @group ddl_ops + * @since 2.1.1 + */ + override def recoverPartitions(tableName: String): Unit = { + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + sparkSession.sessionState.executePlan( + AlterTableRecoverPartitionsCommand(tableIdent)).toRdd + } + + /** * Returns true if the table is currently cached in-memory. * * @group cachemgmt http://git-wip-us.apache.org/repos/asf/spark/blob/0e51bb08/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala index c2ac032..3f84cbd 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala @@ -70,7 +70,7 @@ class PartitionProviderCompatibilitySuite } withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") { verifyIsLegacyTable("test") - spark.sql("msck repair table test") + spark.catalog.recoverPartitions("test") spark.sql("show partitions test").count() // check we are a new table // sanity check table performance @@ -90,7 +90,7 @@ class PartitionProviderCompatibilitySuite setupPartitionedDatasourceTable("test", dir) spark.sql("show partitions test").count() // check we are a new table assert(spark.sql("select * from test").count() == 0) // needs repair - spark.sql("msck repair table test") + spark.catalog.recoverPartitions("test") assert(spark.sql("select * from test").count() == 5) } } @@ -160,7 +160,7 @@ class PartitionProviderCompatibilitySuite withTable("test") { withTempDir { dir => setupPartitionedDatasourceTable("test", dir) - sql("msck repair table test") + spark.catalog.recoverPartitions("test") spark.sql( """insert overwrite table test |partition (partCol=1) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org