Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a8fbcdbf2 -> 9c78d355c


[SPARK-18173][SQL] data source tables should support truncating partition

## What changes were proposed in this pull request?

Previously `TRUNCATE TABLE ... PARTITION` will always truncate the whole table 
for data source tables, this PR fixes it and improve `InMemoryCatalog` to make 
this command work with it.
## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #15688 from cloud-fan/truncate.

(cherry picked from commit 46b2e499935386e28899d860110a6ab16c107c0c)
Signed-off-by: Reynold Xin <r...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c78d355
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c78d355
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c78d355

Branch: refs/heads/branch-2.1
Commit: 9c78d355c541c2abfb4945e5d67bf0d2ba4b4d16
Parents: a8fbcdb
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Sun Nov 6 18:57:13 2016 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Sun Nov 6 18:57:25 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 23 +++++--
 .../catalyst/catalog/ExternalCatalogSuite.scala | 11 ++++
 .../spark/sql/execution/command/tables.scala    | 16 +++--
 .../spark/sql/execution/command/DDLSuite.scala  | 49 ++++++++++++---
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 64 ++++++++++++++++++++
 5 files changed, 146 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c78d355/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index bc39688..20db81e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -487,11 +487,26 @@ class InMemoryCatalog(
       table: String,
       partialSpec: Option[TablePartitionSpec] = None): 
Seq[CatalogTablePartition] = synchronized {
     requireTableExists(db, table)
-    if (partialSpec.nonEmpty) {
-      throw new UnsupportedOperationException(
-        "listPartition with partial partition spec is not implemented")
+
+    partialSpec match {
+      case None => catalog(db).tables(table).partitions.values.toSeq
+      case Some(partial) =>
+        catalog(db).tables(table).partitions.toSeq.collect {
+          case (spec, partition) if isPartialPartitionSpec(partial, spec) => 
partition
+        }
+    }
+  }
+
+  /**
+   * Returns true if `spec1` is a partial partition spec w.r.t. `spec2`, e.g. 
PARTITION (a=1) is a
+   * partial partition spec w.r.t. PARTITION (a=1,b=2).
+   */
+  private def isPartialPartitionSpec(
+      spec1: TablePartitionSpec,
+      spec2: TablePartitionSpec): Boolean = {
+    spec1.forall {
+      case (partitionColumn, value) => spec2(partitionColumn) == value
     }
-    catalog(db).tables(table).partitions.values.toSeq
   }
 
   override def listPartitionsByFilter(

http://git-wip-us.apache.org/repos/asf/spark/blob/9c78d355/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 66f92d1..34bdfc8 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -320,6 +320,17 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     catalog.createPartitions("db2", "tbl2", Seq(part1), ignoreIfExists = true)
   }
 
+  test("list partitions with partial partition spec") {
+    val catalog = newBasicCatalog()
+    val parts = catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "1")))
+    assert(parts.length == 1)
+    assert(parts.head.spec == part1.spec)
+
+    // if no partition is matched for the given partition spec, an empty list 
should be returned.
+    assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> "unknown", 
"b" -> "1"))).isEmpty)
+    assert(catalog.listPartitions("db2", "tbl2", Some(Map("a" -> 
"unknown"))).isEmpty)
+  }
+
   test("drop partitions") {
     val catalog = newBasicCatalog()
     assert(catalogPartitionsEqual(catalog, "db2", "tbl2", Seq(part1, part2)))

http://git-wip-us.apache.org/repos/asf/spark/blob/9c78d355/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 00c646b..3cfa639 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -343,13 +343,19 @@ case class TruncateTableCommand(
       DDLUtils.verifyPartitionProviderIsHive(spark, table, "TRUNCATE TABLE ... 
PARTITION")
     }
     val locations =
-      // TODO: The `InMemoryCatalog` doesn't support listPartition with 
partial partition spec.
-      if (spark.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") {
-        Seq(table.storage.locationUri)
-      } else if (table.partitionColumnNames.isEmpty) {
+      if (table.partitionColumnNames.isEmpty) {
         Seq(table.storage.locationUri)
       } else {
-        catalog.listPartitions(table.identifier, 
partitionSpec).map(_.storage.locationUri)
+        // Here we diverge from Hive when the given partition spec contains 
all partition columns
+        // but no partition is matched: Hive will throw an exception and we 
just do nothing.
+        val normalizedSpec = partitionSpec.map { spec =>
+          PartitioningUtils.normalizePartitionSpec(
+            spec,
+            table.partitionColumnNames,
+            table.identifier.quotedString,
+            spark.sessionState.conf.resolver)
+        }
+        catalog.listPartitions(table.identifier, 
normalizedSpec).map(_.storage.locationUri)
       }
     val hadoopConf = spark.sessionState.newHadoopConf()
     locations.foreach { location =>

http://git-wip-us.apache.org/repos/asf/spark/blob/9c78d355/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 52b09c5..864af8d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1628,29 +1628,62 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
 
   test("truncate table - datasource table") {
     import testImplicits._
-    val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
 
+    val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
     // Test both a Hive compatible and incompatible code path.
     Seq("json", "parquet").foreach { format =>
       withTable("rectangles") {
         data.write.format(format).saveAsTable("rectangles")
         assume(spark.table("rectangles").collect().nonEmpty,
           "bad test; table was empty to begin with")
+
         sql("TRUNCATE TABLE rectangles")
         assert(spark.table("rectangles").collect().isEmpty)
+
+        // not supported since the table is not partitioned
+        assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
       }
     }
+  }
 
-    withTable("rectangles", "rectangles2") {
-      data.write.saveAsTable("rectangles")
-      data.write.partitionBy("length").saveAsTable("rectangles2")
+  test("truncate partitioned table - datasource table") {
+    import testImplicits._
 
-      // not supported since the table is not partitioned
-      assertUnsupported("TRUNCATE TABLE rectangles PARTITION (width=1)")
+    val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", 
"length", "height")
 
+    withTable("partTable") {
+      data.write.partitionBy("width", "length").saveAsTable("partTable")
       // supported since partitions are stored in the metastore
-      sql("TRUNCATE TABLE rectangles2 PARTITION (width=1)")
-      assert(spark.table("rectangles2").collect().isEmpty)
+      sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
+      assert(spark.table("partTable").filter($"width" === 
1).collect().nonEmpty)
+      assert(spark.table("partTable").filter($"width" === 1 && $"length" === 
1).collect().isEmpty)
+    }
+
+    withTable("partTable") {
+      data.write.partitionBy("width", "length").saveAsTable("partTable")
+      // support partial partition spec
+      sql("TRUNCATE TABLE partTable PARTITION (width=1)")
+      assert(spark.table("partTable").collect().nonEmpty)
+      assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)
+    }
+
+    withTable("partTable") {
+      data.write.partitionBy("width", "length").saveAsTable("partTable")
+      // do nothing if no partition is matched for the given partial partition 
spec
+      sql("TRUNCATE TABLE partTable PARTITION (width=100)")
+      assert(spark.table("partTable").count() == data.count())
+
+      // do nothing if no partition is matched for the given non-partial 
partition spec
+      // TODO: This behaviour is different from Hive, we should decide whether 
we need to follow
+      // Hive's behaviour or stick with our existing behaviour later.
+      sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
+      assert(spark.table("partTable").count() == data.count())
+
+      // throw exception if the column in partition spec is not a partition 
column.
+      val e = intercept[AnalysisException] {
+        sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
+      }
+      assert(e.message.contains("unknown is not a valid partition column"))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9c78d355/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 682d7d4..4150e64 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -1098,4 +1098,68 @@ class HiveDDLSuite
       }
     }
   }
+
+  test("truncate table - datasource table") {
+    import testImplicits._
+
+    val data = (1 to 10).map { i => (i, i) }.toDF("width", "length")
+    // Test both a Hive compatible and incompatible code path.
+    Seq("json", "parquet").foreach { format =>
+      withTable("rectangles") {
+        data.write.format(format).saveAsTable("rectangles")
+        assume(spark.table("rectangles").collect().nonEmpty,
+          "bad test; table was empty to begin with")
+
+        sql("TRUNCATE TABLE rectangles")
+        assert(spark.table("rectangles").collect().isEmpty)
+
+        // not supported since the table is not partitioned
+        val e = intercept[AnalysisException] {
+          sql("TRUNCATE TABLE rectangles PARTITION (width=1)")
+        }
+        assert(e.message.contains("Operation not allowed"))
+      }
+    }
+  }
+
+  test("truncate partitioned table - datasource table") {
+    import testImplicits._
+
+    val data = (1 to 10).map { i => (i % 3, i % 5, i) }.toDF("width", 
"length", "height")
+
+    withTable("partTable") {
+      data.write.partitionBy("width", "length").saveAsTable("partTable")
+      // supported since partitions are stored in the metastore
+      sql("TRUNCATE TABLE partTable PARTITION (width=1, length=1)")
+      assert(spark.table("partTable").filter($"width" === 
1).collect().nonEmpty)
+      assert(spark.table("partTable").filter($"width" === 1 && $"length" === 
1).collect().isEmpty)
+    }
+
+    withTable("partTable") {
+      data.write.partitionBy("width", "length").saveAsTable("partTable")
+      // support partial partition spec
+      sql("TRUNCATE TABLE partTable PARTITION (width=1)")
+      assert(spark.table("partTable").collect().nonEmpty)
+      assert(spark.table("partTable").filter($"width" === 1).collect().isEmpty)
+    }
+
+    withTable("partTable") {
+      data.write.partitionBy("width", "length").saveAsTable("partTable")
+      // do nothing if no partition is matched for the given partial partition 
spec
+      sql("TRUNCATE TABLE partTable PARTITION (width=100)")
+      assert(spark.table("partTable").count() == data.count())
+
+      // do nothing if no partition is matched for the given non-partial 
partition spec
+      // TODO: This behaviour is different from Hive, we should decide whether 
we need to follow
+      // Hive's behaviour or stick with our existing behaviour later.
+      sql("TRUNCATE TABLE partTable PARTITION (width=100, length=100)")
+      assert(spark.table("partTable").count() == data.count())
+
+      // throw exception if the column in partition spec is not a partition 
column.
+      val e = intercept[AnalysisException] {
+        sql("TRUNCATE TABLE partTable PARTITION (unknown=1)")
+      }
+      assert(e.message.contains("unknown is not a valid partition column"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to