Repository: spark Updated Branches: refs/heads/master 556a3b7d0 -> 46b2e4999
[SPARK-18173][SQL] data source tables should support truncating partition ## What changes were proposed in this pull request? Previously `TRUNCATE TABLE ... PARTITION` will always truncate the whole table for data source tables, this PR fixes it and improve `InMemoryCatalog` to make this command work with it. ## How was this patch tested? existing tests Author: Wenchen Fan <wenc...@databricks.com> Closes #15688 from cloud-fan/truncate. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46b2e499 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46b2e499 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46b2e499 Branch: refs/heads/master Commit: 46b2e499935386e28899d860110a6ab16c107c0c Parents: 556a3b7 Author: Wenchen Fan <wenc...@databricks.com> Authored: Sun Nov 6 18:57:13 2016 -0800 Committer: Reynold Xin <r...@databricks.com> Committed: Sun Nov 6 18:57:13 2016 -0800 ---------------------------------------------------------------------- .../sql/catalyst/catalog/InMemoryCatalog.scala | 23 +++++-- .../catalyst/catalog/ExternalCatalogSuite.scala | 11 ++++ .../spark/sql/execution/command/tables.scala | 16 +++-- .../spark/sql/execution/command/DDLSuite.scala | 49 ++++++++++++--- .../spark/sql/hive/execution/HiveDDLSuite.scala | 64 ++++++++++++++++++++ 5 files changed, 146 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/46b2e499/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index bc39688..20db81e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -487,11 +487,26 @@ class InMemoryCatalog( table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[CatalogTablePartition] = synchronized { requireTableExists(db, table) - if (partialSpec.nonEmpty) { - throw new UnsupportedOperationException( - "listPartition with partial partition spec is not implemented") + + partialSpec match { + case None => catalog(db).tables(table).partitions.values.toSeq + case Some(partial) => + catalog(db).tables(table).partitions.toSeq.collect { + case (spec, partition) if isPartialPartitionSpec(partial, spec) => partition + } + } + } + + /** + * Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. PARTITION (a=1) is a + * partial partition spec w.r.t. PARTITION (a=1,b=2). + */ + private def isPartialPartitionSpec( + spec1: TablePartitionSpec, + spec2: TablePartitionSpec): Boolean = { + spec1.forall { + case (partitionColumn, value) => spec2(partitionColumn) == value } - catalog(db).tables(table).partitions.values.toSeq } override def listPartitionsByFilter( http://git-wip-us.apache.org/repos/asf/spark/blob/46b2e499/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index 66f92d1..34bdfc8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -320,6 +320,17 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true) } + test("list partitions with partial partition spec") { + val catalog = newBasicCatalog() + val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1"))) + assert(parts.length == 1) + assert(parts.head.spec == part1.spec) + + // if no partition is matched for the given partition spec, an empty list should be returned. + assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown", "b" -> "1"))).isEmpty) + assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown"))).isEmpty) + } + test("drop partitions") { val catalog = newBasicCatalog() assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2))) http://git-wip-us.apache.org/repos/asf/spark/blob/46b2e499/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 00c646b..3cfa639 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -343,13 +343,19 @@ case class TruncateTableCommand( DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... PARTITION") } val locations = - // TODO: The `InMemoryCatalog` doesn't support listPartition with partial partition spec. - if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") { - Seq(table.storage.locationUri) - } else if (table.partitionColumnNames.isEmpty) { + if (table.partitionColumnNames.isEmpty) { Seq(table.storage.locationUri) } else { - catalog.listPartitions(table.identifier, partitionSpec).map(_.storage.locationUri) + // Here we diverge from Hive when the given partition spec contains all partition columns + // but no partition is matched: Hive will throw an exception and we just do nothing. + val normalizedSpec = partitionSpec.map { spec => + PartitioningUtils.normalizePartitionSpec( + spec, + table.partitionColumnNames, + table.identifier.quotedString, + spark.sessionState.conf.resolver) + } + catalog.listPartitions(table.identifier, normalizedSpec).map(_.storage.locationUri) } val hadoopConf = spark.sessionState.newHadoopConf() locations.foreach { location => http://git-wip-us.apache.org/repos/asf/spark/blob/46b2e499/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index 52b09c5..864af8d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -1628,29 +1628,62 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { test("truncate table - datasource table") { import testImplicits._ - val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") + val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") // Test both a Hive compatible and incompatible code path. Seq("json", "parquet").foreach { format => withTable("rectangles") { data.write.format(format).saveAsTable("rectangles") assume(spark.table("rectangles").collect().nonEmpty, "bad test; table was empty to begin with") + sql("TRUNCATE TABLE rectangles") assert(spark.table("rectangles").collect().isEmpty) + + // not supported since the table is not partitioned + assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)") } } + } - withTable("rectangles", "rectangles2") { - data.write.saveAsTable("rectangles") - data.write.partitionBy("length").saveAsTable("rectangles2") + test("truncate partitioned table - datasource table") { + import testImplicits._ - // not supported since the table is not partitioned - assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)") + val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height") + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") // supported since partitions are stored in the metastore - sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)") - assert(spark.table("rectangles2").collect().isEmpty) + sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)") + assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // support partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=1)") + assert(spark.table("partTable").collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // do nothing if no partition is matched for the given partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=100)") + assert(spark.table("partTable").count() == data.count()) + + // do nothing if no partition is matched for the given non-partial partition spec + // TODO: This behaviour is different from Hive, we should decide whether we need to follow + // Hive's behaviour or stick with our existing behaviour later. + sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)") + assert(spark.table("partTable").count() == data.count()) + + // throw exception if the column in partition spec is not a partition column. + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE partTable PARTITION (unknown=1)") + } + assert(e.message.contains("unknown is not a valid partition column")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/46b2e499/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 682d7d4..4150e64 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -1098,4 +1098,68 @@ class HiveDDLSuite } } } + + test("truncate table - datasource table") { + import testImplicits._ + + val data = (1 to 10).map { i => (i, i) }.toDF("width", "length") + // Test both a Hive compatible and incompatible code path. + Seq("json", "parquet").foreach { format => + withTable("rectangles") { + data.write.format(format).saveAsTable("rectangles") + assume(spark.table("rectangles").collect().nonEmpty, + "bad test; table was empty to begin with") + + sql("TRUNCATE TABLE rectangles") + assert(spark.table("rectangles").collect().isEmpty) + + // not supported since the table is not partitioned + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE rectangles PARTITION (width=1)") + } + assert(e.message.contains("Operation not allowed")) + } + } + } + + test("truncate partitioned table - datasource table") { + import testImplicits._ + + val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", "length", "height") + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // supported since partitions are stored in the metastore + sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)") + assert(spark.table("partTable").filter($"width" === 1).collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1 && $"length" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // support partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=1)") + assert(spark.table("partTable").collect().nonEmpty) + assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty) + } + + withTable("partTable") { + data.write.partitionBy("width", "length").saveAsTable("partTable") + // do nothing if no partition is matched for the given partial partition spec + sql("TRUNCATE TABLE partTable PARTITION (width=100)") + assert(spark.table("partTable").count() == data.count()) + + // do nothing if no partition is matched for the given non-partial partition spec + // TODO: This behaviour is different from Hive, we should decide whether we need to follow + // Hive's behaviour or stick with our existing behaviour later. + sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)") + assert(spark.table("partTable").count() == data.count()) + + // throw exception if the column in partition spec is not a partition column. + val e = intercept[AnalysisException] { + sql("TRUNCATE TABLE partTable PARTITION (unknown=1)") + } + assert(e.message.contains("unknown is not a valid partition column")) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org