This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 73857cd  [SPARK-34524][SQL] Simplify v2 partition commands resolution
73857cd is described below

commit 73857cdd87757d2888bd92f6b7c2fad709701484
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Fri Feb 26 11:44:42 2021 +0000

    [SPARK-34524][SQL] Simplify v2 partition commands resolution
    
    ### What changes were proposed in this pull request?
    
    This PR simplifies the resolution of v2 partition commands:
    1. Add a common trait for v2 partition commands, so that we don't need to 
match them one by one in the rules.
    2. Make partition spec an expression, so that it's easier to resolve them 
via tree node transformation.
    3. Add `TruncatePartition` so that `TruncateTable` doesn't need to be a v2 
partition command.
    4. Simplify `CheckAnalysis` to only check if the table is partitioned. For 
partitioned tables, partition spec is always resolved, so we don't need to 
check it. The `SupportsAtomicPartitionManagement` check is also done in the 
runtime. Since Spark eagerly executes commands, exception in runtime will also 
be thrown at analysis time.
    
    ### Why are the changes needed?
    
    code cleanup
    
    ### Does this PR introduce _any_ user-facing change?
    
    no
    
    ### How was this patch tested?
    
    existing tests
    
    Closes #31637 from cloud-fan/simplify.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      | 63 +++++----------
 .../catalyst/analysis/ResolvePartitionSpec.scala   | 92 ++++++++--------------
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  | 16 +++-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  9 +--
 .../sql/catalyst/plans/logical/v2Commands.scala    | 62 +++++++--------
 .../datasources/v2/DataSourceV2Implicits.scala     | 10 ++-
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  7 +-
 .../datasources/v2/DataSourceV2Strategy.scala      | 14 ++--
 ...TableExec.scala => TruncatePartitionExec.scala} | 34 ++++----
 .../datasources/v2/TruncateTableExec.scala         | 21 +----
 .../command/ShowPartitionsSuiteBase.scala          |  4 +-
 .../command/TruncateTableParserSuite.scala         | 12 +--
 .../command/v2/AlterTableAddPartitionSuite.scala   |  2 +-
 .../command/v2/AlterTableDropPartitionSuite.scala  |  2 +-
 .../execution/command/v2/ShowPartitionsSuite.scala |  3 +-
 .../execution/command/v2/TruncateTableSuite.scala  |  2 +-
 16 files changed, 153 insertions(+), 200 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 59e37e8..389bbb8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -26,7 +26,7 @@ import 
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, TypeUtils}
-import org.apache.spark.sql.connector.catalog.{LookupCatalog, 
SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table, 
TruncatableTable}
+import org.apache.spark.sql.connector.catalog.{LookupCatalog, 
SupportsAtomicPartitionManagement, SupportsPartitionManagement, Table}
 import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, 
ColumnPosition, DeleteColumn, RenameColumn, UpdateColumnComment, 
UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
 import org.apache.spark.sql.errors.QueryExecutionErrors
 import org.apache.spark.sql.internal.SQLConf
@@ -150,6 +150,23 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
       case AlterTable(_, _, u: UnresolvedV2Relation, _) =>
         failAnalysis(s"Table not found: ${u.originalNameParts.quoted}")
 
+      case command: V2PartitionCommand =>
+        command.table match {
+          case r @ ResolvedTable(_, _, table, _) => table match {
+            case t: SupportsPartitionManagement =>
+              if (t.partitionSchema.isEmpty) {
+                failAnalysis(s"Table ${r.name} is not partitioned.")
+              }
+            case _ =>
+              failAnalysis(s"Table ${r.name} does not support partition 
management.")
+          }
+          case _ =>
+        }
+
+      // `ShowTableExtended` should have been converted to the v1 command if 
the table is v1.
+      case _: ShowTableExtended =>
+        throw new AnalysisException("SHOW TABLE EXTENDED is not supported for 
v2 tables.")
+
       case operator: LogicalPlan =>
         // Check argument data types of higher-order functions downwards first.
         // If the arguments of the higher-order functions are resolved but the 
type check fails,
@@ -565,19 +582,6 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
               // no validation needed for set and remove property
             }
 
-          case AddPartitions(r: ResolvedTable, parts, _) =>
-            checkAlterTablePartition(r.table, parts)
-
-          case DropPartitions(r: ResolvedTable, parts, _, _) =>
-            checkAlterTablePartition(r.table, parts)
-
-          case RenamePartitions(r: ResolvedTable, from, _) =>
-            checkAlterTablePartition(r.table, Seq(from))
-
-          case showPartitions: ShowPartitions => 
checkShowPartitions(showPartitions)
-
-          case truncateTable: TruncateTable => 
checkTruncateTable(truncateTable)
-
           case _ => // Falls back to the following checks
         }
 
@@ -1009,35 +1013,4 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
       case _ =>
     }
   }
-
-  // Make sure that the `SHOW PARTITIONS` command is allowed for the table
-  private def checkShowPartitions(showPartitions: ShowPartitions): Unit = 
showPartitions match {
-    case ShowPartitions(rt: ResolvedTable, _, _)
-        if !rt.table.isInstanceOf[SupportsPartitionManagement] =>
-      failAnalysis("SHOW PARTITIONS cannot run for a table which does not 
support partitioning")
-    case ShowPartitions(ResolvedTable(_, _, partTable: 
SupportsPartitionManagement, _), _, _)
-        if partTable.partitionSchema().isEmpty =>
-      failAnalysis(
-        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: 
${partTable.name()}")
-    case _ =>
-  }
-
-  private def checkTruncateTable(truncateTable: TruncateTable): Unit = 
truncateTable match {
-    case TruncateTable(rt: ResolvedTable, None) if 
!rt.table.isInstanceOf[TruncatableTable] =>
-      failAnalysis(s"The table ${rt.table.name()} does not support truncation")
-    case TruncateTable(rt: ResolvedTable, Some(_))
-        if !rt.table.isInstanceOf[SupportsPartitionManagement] =>
-      failAnalysis("TRUNCATE TABLE cannot run for a table which does not 
support partitioning")
-    case TruncateTable(
-        ResolvedTable(_, _, _: SupportsPartitionManagement, _),
-        Some(_: UnresolvedPartitionSpec)) =>
-      failAnalysis("Partition spec is not resolved")
-    case TruncateTable(
-        ResolvedTable(_, _, table: SupportsPartitionManagement, _),
-        Some(spec: ResolvedPartitionSpec))
-      if spec.names.length < table.partitionSchema.length &&
-        !table.isInstanceOf[SupportsAtomicPartitionManagement] =>
-      failAnalysis(s"The table ${table.name()} does not support truncation of 
multiple partitions")
-    case _ =>
-  }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
index e68c979..79b7b0c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolvePartitionSpec.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.analysis
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{AddPartitions, 
DropPartitions, LogicalPlan, RenamePartitions, ShowPartitions, TruncateTable}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
V2PartitionCommand}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.SupportsPartitionManagement
@@ -33,70 +33,42 @@ import 
org.apache.spark.sql.util.PartitioningUtils.{normalizePartitionSpec, requ
 object ResolvePartitionSpec extends Rule[LogicalPlan] {
 
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case r @ AddPartitions(
-        ResolvedTable(_, _, table: SupportsPartitionManagement, _), partSpecs, 
_) =>
-      val partitionSchema = table.partitionSchema()
-      r.copy(parts = resolvePartitionSpecs(
-        table.name,
-        partSpecs,
-        partitionSchema,
-        requireExactMatchedPartitionSpec(table.name, _, 
partitionSchema.fieldNames)))
-
-    case r @ DropPartitions(
-        ResolvedTable(_, _, table: SupportsPartitionManagement, _), partSpecs, 
_, _) =>
-      val partitionSchema = table.partitionSchema()
-      r.copy(parts = resolvePartitionSpecs(
-        table.name,
-        partSpecs,
-        partitionSchema,
-        requireExactMatchedPartitionSpec(table.name, _, 
partitionSchema.fieldNames)))
-
-    case r @ RenamePartitions(
-        ResolvedTable(_, _, table: SupportsPartitionManagement, _), from, to) 
=>
-      val partitionSchema = table.partitionSchema()
-      val Seq(resolvedFrom, resolvedTo) = resolvePartitionSpecs(
-        table.name,
-        Seq(from, to),
-        partitionSchema,
-        requireExactMatchedPartitionSpec(table.name, _, 
partitionSchema.fieldNames))
-      r.copy(from = resolvedFrom, to = resolvedTo)
-
-    case r @ ShowPartitions(
-        ResolvedTable(_, _, table: SupportsPartitionManagement, _), partSpecs, 
_) =>
-      r.copy(pattern = resolvePartitionSpecs(
-        table.name,
-        partSpecs.toSeq,
-        table.partitionSchema()).headOption)
-
-    case r @ TruncateTable(ResolvedTable(_, _, table: 
SupportsPartitionManagement, _), partSpecs) =>
-      r.copy(partitionSpec = resolvePartitionSpecs(
-        table.name,
-        partSpecs.toSeq,
-        table.partitionSchema()).headOption)
+    case command: V2PartitionCommand if command.childrenResolved && 
!command.resolved =>
+      command.table match {
+        case r @ ResolvedTable(_, _, table: SupportsPartitionManagement, _) =>
+          command.transformExpressions {
+            case partSpecs: UnresolvedPartitionSpec =>
+              val partitionSchema = table.partitionSchema()
+              resolvePartitionSpec(
+                r.name,
+                partSpecs,
+                partitionSchema,
+                command.allowPartialPartitionSpec)
+          }
+        case _ => command
+      }
   }
 
-  private def resolvePartitionSpecs(
+  private def resolvePartitionSpec(
       tableName: String,
-      partSpecs: Seq[PartitionSpec],
+      partSpec: UnresolvedPartitionSpec,
       partSchema: StructType,
-      checkSpec: TablePartitionSpec => Unit = _ => ()): 
Seq[ResolvedPartitionSpec] =
-    partSpecs.map {
-      case unresolvedPartSpec: UnresolvedPartitionSpec =>
-        val normalizedSpec = normalizePartitionSpec(
-          unresolvedPartSpec.spec,
-          partSchema,
-          tableName,
-          conf.resolver)
-        checkSpec(normalizedSpec)
-        val partitionNames = normalizedSpec.keySet
-        val requestedFields = partSchema.filter(field => 
partitionNames.contains(field.name))
-        ResolvedPartitionSpec(
-          requestedFields.map(_.name),
-          convertToPartIdent(normalizedSpec, requestedFields),
-          unresolvedPartSpec.location)
-      case resolvedPartitionSpec: ResolvedPartitionSpec =>
-        resolvedPartitionSpec
+      allowPartitionSpec: Boolean): ResolvedPartitionSpec = {
+    val normalizedSpec = normalizePartitionSpec(
+      partSpec.spec,
+      partSchema,
+      tableName,
+      conf.resolver)
+    if (!allowPartitionSpec) {
+      requireExactMatchedPartitionSpec(tableName, normalizedSpec, 
partSchema.fieldNames)
     }
+    val partitionNames = normalizedSpec.keySet
+    val requestedFields = partSchema.filter(field => 
partitionNames.contains(field.name))
+    ResolvedPartitionSpec(
+      requestedFields.map(_.name),
+      convertToPartIdent(normalizedSpec, requestedFields),
+      partSpec.location)
+  }
 
   private[sql] def convertToPartIdent(
       partitionSpec: TablePartitionSpec,
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index f7e08bd..b50c306 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -19,10 +19,12 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.{Attribute, LeafExpression, 
Unevaluable}
 import org.apache.spark.sql.catalyst.plans.logical.LeafNode
 import org.apache.spark.sql.catalyst.util.CharVarcharUtils
 import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, 
Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.types.DataType
 
 /**
  * Holds the name of a namespace that has yet to be looked up in a catalog. It 
will be resolved to
@@ -73,11 +75,18 @@ case class UnresolvedTableOrView(
   override def output: Seq[Attribute] = Nil
 }
 
-sealed trait PartitionSpec
+sealed trait PartitionSpec extends LeafExpression with Unevaluable {
+  override def dataType: DataType = throw new IllegalStateException(
+    "PartitionSpec.dataType should not be called.")
+  override def nullable: Boolean = throw new IllegalStateException(
+    "PartitionSpec.nullable should not be called.")
+}
 
 case class UnresolvedPartitionSpec(
     spec: TablePartitionSpec,
-    location: Option[String] = None) extends PartitionSpec
+    location: Option[String] = None) extends PartitionSpec {
+  override lazy val resolved = false
+}
 
 /**
  * Holds the name of a function that has yet to be looked up in a catalog. It 
will be resolved to
@@ -109,6 +118,7 @@ case class ResolvedTable(
     val qualifier = catalog.name +: identifier.namespace :+ identifier.name
     outputAttributes.map(_.withQualifier(qualifier))
   }
+  def name: String = (catalog.name +: identifier.namespace() :+ 
identifier.name()).quoted
 }
 
 object ResolvedTable {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 25e6cbe..a43d28b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3757,11 +3757,10 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] 
with SQLConfHelper with Logg
    * }}}
    */
   override def visitTruncateTable(ctx: TruncateTableContext): LogicalPlan = 
withOrigin(ctx) {
-    TruncateTable(
-      createUnresolvedTable(ctx.multipartIdentifier, "TRUNCATE TABLE"),
-      Option(ctx.partitionSpec).map { spec =>
-        UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(spec))
-      })
+    val table = createUnresolvedTable(ctx.multipartIdentifier, "TRUNCATE 
TABLE")
+    Option(ctx.partitionSpec).map { spec =>
+      TruncatePartition(table, 
UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(spec)))
+    }.getOrElse(TruncateTable(table))
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
index ea67c55..847d7ae 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, 
ResolvedPartitionSpec, UnresolvedException}
+import org.apache.spark.sql.catalyst.analysis.{NamedRelation, PartitionSpec, 
UnresolvedException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, AttributeSet, Expression, Unevaluable}
 import org.apache.spark.sql.catalyst.plans.DescribeCommandSchema
@@ -59,6 +59,11 @@ trait V2WriteCommand extends Command {
   def withNewTable(newTable: NamedRelation): V2WriteCommand
 }
 
+trait V2PartitionCommand extends Command {
+  def table: LogicalPlan
+  def allowPartialPartitionSpec: Boolean = false
+}
+
 /**
  * Append data to an existing table.
  */
@@ -677,13 +682,10 @@ case class AnalyzeColumn(
  * }}}
  */
 case class AddPartitions(
-    child: LogicalPlan,
+    table: LogicalPlan,
     parts: Seq[PartitionSpec],
-    ifNotExists: Boolean) extends Command {
-  override lazy val resolved: Boolean =
-    childrenResolved && parts.forall(_.isInstanceOf[ResolvedPartitionSpec])
-
-  override def children: Seq[LogicalPlan] = child :: Nil
+    ifNotExists: Boolean) extends V2PartitionCommand {
+  override def children: Seq[LogicalPlan] = table :: Nil
 }
 
 /**
@@ -699,29 +701,21 @@ case class AddPartitions(
  * }}}
  */
 case class DropPartitions(
-    child: LogicalPlan,
+    table: LogicalPlan,
     parts: Seq[PartitionSpec],
     ifExists: Boolean,
-    purge: Boolean) extends Command {
-  override lazy val resolved: Boolean =
-    childrenResolved && parts.forall(_.isInstanceOf[ResolvedPartitionSpec])
-
-  override def children: Seq[LogicalPlan] = child :: Nil
+    purge: Boolean) extends V2PartitionCommand {
+  override def children: Seq[LogicalPlan] = table :: Nil
 }
 
 /**
  * The logical plan of the ALTER TABLE ... RENAME TO PARTITION command.
  */
 case class RenamePartitions(
-    child: LogicalPlan,
+    table: LogicalPlan,
     from: PartitionSpec,
-    to: PartitionSpec) extends Command {
-  override lazy val resolved: Boolean =
-    childrenResolved &&
-      from.isInstanceOf[ResolvedPartitionSpec] &&
-      to.isInstanceOf[ResolvedPartitionSpec]
-
-  override def children: Seq[LogicalPlan] = child :: Nil
+    to: PartitionSpec) extends V2PartitionCommand {
+  override def children: Seq[LogicalPlan] = table :: Nil
 }
 
 /**
@@ -767,23 +761,29 @@ object ShowColumns {
 /**
  * The logical plan of the TRUNCATE TABLE command.
  */
-case class TruncateTable(
-    child: LogicalPlan,
-    partitionSpec: Option[PartitionSpec]) extends Command {
-  override def children: Seq[LogicalPlan] = child :: Nil
+case class TruncateTable(table: LogicalPlan) extends Command {
+  override def children: Seq[LogicalPlan] = table :: Nil
+}
+
+/**
+ * The logical plan of the TRUNCATE TABLE ... PARTITION command.
+ */
+case class TruncatePartition(
+    table: LogicalPlan,
+    partitionSpec: PartitionSpec) extends V2PartitionCommand {
+  override def children: Seq[LogicalPlan] = table :: Nil
+  override def allowPartialPartitionSpec: Boolean = true
 }
 
 /**
  * The logical plan of the SHOW PARTITIONS command.
  */
 case class ShowPartitions(
-    child: LogicalPlan,
+    table: LogicalPlan,
     pattern: Option[PartitionSpec],
-    override val output: Seq[Attribute] = ShowPartitions.OUTPUT) extends 
Command {
-  override def children: Seq[LogicalPlan] = child :: Nil
-
-  override lazy val resolved: Boolean =
-    childrenResolved && pattern.forall(_.isInstanceOf[ResolvedPartitionSpec])
+    override val output: Seq[Attribute] = ShowPartitions.OUTPUT) extends 
V2PartitionCommand {
+  override def children: Seq[LogicalPlan] = table :: Nil
+  override def allowPartialPartitionSpec: Boolean = true
 }
 
 object ShowPartitions {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index 4326c73..daa2c04 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, 
ResolvedPartitionSpec, UnresolvedPartitionSpec}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference}
-import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, 
SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, 
SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, 
SupportsRead, SupportsWrite, Table, TableCapability, TruncatableTable}
 import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -57,6 +57,14 @@ object DataSourceV2Implicits {
       }
     }
 
+    def asTruncatable: TruncatableTable = {
+      table match {
+        case t: TruncatableTable => t
+        case _ =>
+          throw new AnalysisException(s"Table does not support truncates: 
${table.name}")
+      }
+    }
+
     def asPartitionable: SupportsPartitionManagement = {
       table match {
         case support: SupportsPartitionManagement =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index dde31f6..290833d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -394,10 +394,13 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
         ShowCreateTableCommand(ident.asTableIdentifier)
       }
 
-    case TruncateTable(ResolvedV1TableIdentifier(ident), partitionSpec) =>
+    case TruncateTable(ResolvedV1TableIdentifier(ident)) =>
+      TruncateTableCommand(ident.asTableIdentifier, None)
+
+    case TruncatePartition(ResolvedV1TableIdentifier(ident), partitionSpec) =>
       TruncateTableCommand(
         ident.asTableIdentifier,
-        partitionSpec.toSeq.asUnresolvedPartitionSpecs.map(_.spec).headOption)
+        Seq(partitionSpec).asUnresolvedPartitionSpecs.map(_.spec).headOption)
 
     case s @ ShowPartitions(
         ResolvedV1TableOrViewIdentifier(ident),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index a5b092a..135de2a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -337,9 +337,6 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) =>
       ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
 
-    case _: ShowTableExtended =>
-      throw new AnalysisException("SHOW TABLE EXTENDED is not supported for v2 
tables.")
-
     case SetCatalogAndNamespace(catalogManager, catalogName, ns) =>
       SetCatalogAndNamespaceExec(catalogManager, catalogName, ns) :: Nil
 
@@ -394,10 +391,15 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     case ShowCreateTable(_: ResolvedTable, _) =>
       throw new AnalysisException("SHOW CREATE TABLE is not supported for v2 
tables.")
 
-    case TruncateTable(r: ResolvedTable, parts) =>
+    case TruncateTable(r: ResolvedTable) =>
       TruncateTableExec(
-        r.table,
-        parts.toSeq.asResolvedPartitionSpecs.headOption,
+        r.table.asTruncatable,
+        recacheTable(r)) :: Nil
+
+    case TruncatePartition(r: ResolvedTable, part) =>
+      TruncatePartitionExec(
+        r.table.asPartitionable,
+        Seq(part).asResolvedPartitionSpecs.head,
         recacheTable(r)) :: Nil
 
     case ShowColumns(_: ResolvedTable, _, _) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncatePartitionExec.scala
similarity index 56%
copy from 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
copy to 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncatePartitionExec.scala
index 17f86e2..135005b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncatePartitionExec.scala
@@ -20,31 +20,31 @@ package org.apache.spark.sql.execution.datasources.v2
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import 
org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, 
SupportsPartitionManagement, Table, TruncatableTable}
+import 
org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, 
SupportsPartitionManagement}
 
 /**
- * Physical plan node for table truncation.
+ * Physical plan node for table partition truncation.
  */
-case class TruncateTableExec(
-    table: Table,
-    partSpecs: Option[ResolvedPartitionSpec],
+case class TruncatePartitionExec(
+    table: SupportsPartitionManagement,
+    partSpec: ResolvedPartitionSpec,
     refreshCache: () => Unit) extends V2CommandExec {
 
   override def output: Seq[Attribute] = Seq.empty
 
   override protected def run(): Seq[InternalRow] = {
-    val isTableAltered = (table, partSpecs) match {
-      case (truncatableTable: TruncatableTable, None) =>
-        truncatableTable.truncateTable()
-      case (partTable: SupportsPartitionManagement, Some(resolvedPartSpec))
-        if partTable.partitionSchema.length == resolvedPartSpec.names.length =>
-        partTable.truncatePartition(resolvedPartSpec.ident)
-      case (atomicPartTable: SupportsAtomicPartitionManagement, 
Some(resolvedPartitionSpec)) =>
-        val partitionIdentifiers = atomicPartTable.listPartitionIdentifiers(
-          resolvedPartitionSpec.names.toArray, resolvedPartitionSpec.ident)
-        atomicPartTable.truncatePartitions(partitionIdentifiers)
-      case _ => throw new IllegalArgumentException(
-        s"Truncation of ${table.getClass.getName} is not supported")
+    val isTableAltered = if (table.partitionSchema.length != 
partSpec.names.length) {
+      table match {
+        case atomicPartTable: SupportsAtomicPartitionManagement =>
+          val partitionIdentifiers = atomicPartTable.listPartitionIdentifiers(
+            partSpec.names.toArray, partSpec.ident)
+          atomicPartTable.truncatePartitions(partitionIdentifiers)
+        case _ =>
+          throw new UnsupportedOperationException(
+            s"The table ${table.name()} does not support truncation of 
multiple partition.")
+      }
+    } else {
+      table.truncatePartition(partSpec.ident)
     }
     if (isTableAltered) refreshCache()
     Seq.empty
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
index 17f86e2..69261b3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/TruncateTableExec.scala
@@ -18,35 +18,20 @@
 package org.apache.spark.sql.execution.datasources.v2
 
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.analysis.ResolvedPartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import 
org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, 
SupportsPartitionManagement, Table, TruncatableTable}
+import org.apache.spark.sql.connector.catalog.TruncatableTable
 
 /**
  * Physical plan node for table truncation.
  */
 case class TruncateTableExec(
-    table: Table,
-    partSpecs: Option[ResolvedPartitionSpec],
+    table: TruncatableTable,
     refreshCache: () => Unit) extends V2CommandExec {
 
   override def output: Seq[Attribute] = Seq.empty
 
   override protected def run(): Seq[InternalRow] = {
-    val isTableAltered = (table, partSpecs) match {
-      case (truncatableTable: TruncatableTable, None) =>
-        truncatableTable.truncateTable()
-      case (partTable: SupportsPartitionManagement, Some(resolvedPartSpec))
-        if partTable.partitionSchema.length == resolvedPartSpec.names.length =>
-        partTable.truncatePartition(resolvedPartSpec.ident)
-      case (atomicPartTable: SupportsAtomicPartitionManagement, 
Some(resolvedPartitionSpec)) =>
-        val partitionIdentifiers = atomicPartTable.listPartitionIdentifiers(
-          resolvedPartitionSpec.names.toArray, resolvedPartitionSpec.ident)
-        atomicPartTable.truncatePartitions(partitionIdentifiers)
-      case _ => throw new IllegalArgumentException(
-        s"Truncation of ${table.getClass.getName} is not supported")
-    }
-    if (isTableAltered) refreshCache()
+    if (table.truncateTable()) refreshCache()
     Seq.empty
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
index 29edb8f..27d2eb9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
@@ -69,7 +69,9 @@ trait ShowPartitionsSuiteBase extends QueryTest with 
DDLCommandTestUtils {
       val errMsg = intercept[AnalysisException] {
         sql(s"SHOW PARTITIONS $t")
       }.getMessage
-      assert(errMsg.contains("not allowed on a table that is not partitioned"))
+      assert(errMsg.contains("not allowed on a table that is not partitioned") 
||
+        // V2 error message.
+        errMsg.contains(s"Table $t is not partitioned"))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableParserSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableParserSuite.scala
index 39531c8..7f4a480 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableParserSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/TruncateTableParserSuite.scala
@@ -20,30 +20,30 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, 
UnresolvedPartitionSpec, UnresolvedTable}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser.parsePlan
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.TruncateTable
+import org.apache.spark.sql.catalyst.plans.logical.{TruncatePartition, 
TruncateTable}
 import org.apache.spark.sql.test.SharedSparkSession
 
 class TruncateTableParserSuite extends AnalysisTest with SharedSparkSession {
   test("truncate table") {
     comparePlans(
       parsePlan("TRUNCATE TABLE a.b.c"),
-      TruncateTable(UnresolvedTable(Seq("a", "b", "c"), "TRUNCATE TABLE", 
None), None))
+      TruncateTable(UnresolvedTable(Seq("a", "b", "c"), "TRUNCATE TABLE", 
None)))
   }
 
   test("truncate a single part partition") {
     comparePlans(
       parsePlan("TRUNCATE TABLE a.b.c PARTITION(ds='2017-06-10')"),
-      TruncateTable(
+      TruncatePartition(
         UnresolvedTable(Seq("a", "b", "c"), "TRUNCATE TABLE", None),
-        Some(UnresolvedPartitionSpec(Map("ds" -> "2017-06-10"), None))))
+        UnresolvedPartitionSpec(Map("ds" -> "2017-06-10"), None)))
   }
 
   test("truncate a multi parts partition") {
     comparePlans(
       parsePlan("TRUNCATE TABLE ns.tbl PARTITION(a = 1, B = 'ABC')"),
-      TruncateTable(
+      TruncatePartition(
         UnresolvedTable(Seq("ns", "tbl"), "TRUNCATE TABLE", None),
-        Some(UnresolvedPartitionSpec(Map("a" -> "1", "B" -> "ABC"), None))))
+        UnresolvedPartitionSpec(Map("a" -> "1", "B" -> "ABC"), None)))
   }
 
   test("empty values in non-optional partition specs") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
index a33eb0e..fabe399 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableAddPartitionSuite.scala
@@ -33,7 +33,7 @@ class AlterTableAddPartitionSuite
       val errMsg = intercept[AnalysisException] {
         sql(s"ALTER TABLE $t ADD PARTITION (id=1)")
       }.getMessage
-      assert(errMsg.contains(s"Table $t can not alter partitions"))
+      assert(errMsg.contains(s"Table $t does not support partition 
management"))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala
index 3515fa3..b03c8fb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/AlterTableDropPartitionSuite.scala
@@ -36,7 +36,7 @@ class AlterTableDropPartitionSuite
       val errMsg = intercept[AnalysisException] {
         sql(s"ALTER TABLE $t DROP PARTITION (id=1)")
       }.getMessage
-      assert(errMsg.contains("can not alter partitions"))
+      assert(errMsg.contains(s"Table $t does not support partition 
management"))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
index 42f05ee..8ae8171 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
@@ -33,8 +33,7 @@ class ShowPartitionsSuite extends 
command.ShowPartitionsSuiteBase with CommandSu
       val errMsg = intercept[AnalysisException] {
         sql(s"SHOW PARTITIONS $table")
       }.getMessage
-      assert(errMsg.contains(
-        "SHOW PARTITIONS cannot run for a table which does not support 
partitioning"))
+      assert(errMsg.contains(s"Table $table does not support partition 
management"))
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/TruncateTableSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/TruncateTableSuite.scala
index 1e14a08..f125a72 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/TruncateTableSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/TruncateTableSuite.scala
@@ -36,7 +36,7 @@ class TruncateTableSuite extends 
command.TruncateTableSuiteBase with CommandSuit
         sql(s"TRUNCATE TABLE $t PARTITION (c0=1)")
       }.getMessage
       assert(errMsg.contains(
-        "TRUNCATE TABLE cannot run for a table which does not support 
partitioning"))
+        "Table non_part_test_catalog.ns.tbl does not support partition 
management"))
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to