This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new d50a3ce4a42 [SPARK-43203][SQL][3.4] Move all Drop Table case to 
DataSource V2
d50a3ce4a42 is described below

commit d50a3ce4a42ce95d3a7eb4c1a4db4a77a070da13
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Fri Sep 8 08:22:17 2023 -0700

    [SPARK-43203][SQL][3.4] Move all Drop Table case to DataSource V2
    
    ### What changes were proposed in this pull request?
    cherry pick #41348 and #42056 , this a bug fixed should be included in 3.4.2
    
    ### Why are the changes needed?
    Fix DROP table behavior in session catalog
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested by:
    - V2 table catalog tests: 
`org.apache.spark.sql.execution.command.v2.DropTableSuite`
    - V1 table catalog tests: 
`org.apache.spark.sql.execution.command.v1.DropTableSuiteBase`
    
    Closes #41765 from Hisoka-X/move_drop_table_v2_to_3.4.2.
    
    Lead-authored-by: Jia Fan <fanjiaemi...@qq.com>
    Co-authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  2 +-
 .../apache/spark/sql/execution/CacheManager.scala  |  6 ++++
 .../datasources/v2/V2SessionCatalog.scala          | 42 +++++++++++++++++-----
 .../execution/command/PlanResolutionSuite.scala    | 17 ++++-----
 .../hive/execution/command/DropTableSuite.scala    |  2 +-
 5 files changed, 50 insertions(+), 19 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index b2b35b40492..e5a36394a44 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -210,7 +210,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
         c
       }
 
-    case DropTable(ResolvedV1Identifier(ident), ifExists, purge) =>
+    case DropTable(ResolvedV1Identifier(ident), ifExists, purge) if 
conf.useV1Command =>
       DropTableCommand(ident, ifExists, isView = false, purge = purge)
 
     // v1 DROP TABLE supports temp view.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index d41611439f0..b1153d7a1e8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.ConfigEntry
 import org.apache.spark.sql.{Dataset, SparkSession}
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.expressions.{Attribute, 
SubqueryExpression}
 import org.apache.spark.sql.catalyst.optimizer.EliminateResolvedHint
 import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, 
LogicalPlan, ResolvedHint, SubqueryAlias, View}
@@ -190,6 +191,11 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
         isSameName(ident.qualifier :+ ident.name) &&
           isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ 
v1Ident.table)
 
+      case SubqueryAlias(ident, HiveTableRelation(catalogTable, _, _, _, _)) =>
+        val v1Ident = catalogTable.identifier
+        isSameName(ident.qualifier :+ ident.name) &&
+          isSameName(v1Ident.catalog.toSeq ++ v1Ident.database :+ 
v1Ident.table)
+
       case _ => false
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index b4789c98df9..d48f771b9db 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -70,7 +70,12 @@ class V2SessionCatalog(catalog: SessionCatalog)
   }
 
   override def loadTable(ident: Identifier): Table = {
-    V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
+    try {
+      V1Table(catalog.getTableMetadata(ident.asTableIdentifier))
+    } catch {
+      case _: NoSuchDatabaseException =>
+        throw QueryCompilationErrors.noSuchTableError(ident)
+    }
   }
 
   override def loadTable(ident: Identifier, timestamp: Long): Table = {
@@ -189,16 +194,35 @@ class V2SessionCatalog(catalog: SessionCatalog)
     loadTable(ident)
   }
 
+  override def purgeTable(ident: Identifier): Boolean = {
+    dropTableInternal(ident, purge = true)
+  }
+
   override def dropTable(ident: Identifier): Boolean = {
+    dropTableInternal(ident)
+  }
+
+  private def dropTableInternal(ident: Identifier, purge: Boolean = false): 
Boolean = {
     try {
-      if (loadTable(ident) != null) {
-        catalog.dropTable(
-          ident.asTableIdentifier,
-          ignoreIfNotExists = true,
-          purge = true /* skip HDFS trash */)
-        true
-      } else {
-        false
+      loadTable(ident) match {
+        case V1Table(v1Table) if v1Table.tableType == CatalogTableType.VIEW =>
+          throw QueryCompilationErrors.wrongCommandForObjectTypeError(
+            operation = "DROP TABLE",
+            requiredType = s"${CatalogTableType.EXTERNAL.name} or" +
+              s" ${CatalogTableType.MANAGED.name}",
+            objectName = v1Table.qualifiedName,
+            foundType = v1Table.tableType.name,
+            alternative = "DROP VIEW"
+          )
+        case null =>
+          false
+        case _ =>
+          catalog.invalidateCachedTable(ident.asTableIdentifier)
+          catalog.dropTable(
+            ident.asTableIdentifier,
+            ignoreIfNotExists = true,
+            purge = purge)
+          true
       }
     } catch {
       case _: NoSuchTableException =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 2cf4792b8c1..dc69acc6f91 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -691,22 +691,23 @@ class PlanResolutionSuite extends AnalysisTest {
 
   test("drop table") {
     val tableName1 = "db.v1Table"
-    val tableIdent1 = TableIdentifier("v1Table", Option("db"), 
Some(SESSION_CATALOG_NAME))
+    val tableIdent1 = ResolvedIdentifier(v2SessionCatalog, 
Identifier.of(Array("db"), "v1Table"))
     val tableName2 = "v1Table"
-    val tableIdent2 = TableIdentifier("v1Table", Some("default"), 
Some(SESSION_CATALOG_NAME))
+    val tableIdent2 = ResolvedIdentifier(v2SessionCatalog, 
Identifier.of(Array("default"),
+      "v1Table"))
 
     parseResolveCompare(s"DROP TABLE $tableName1",
-      DropTableCommand(tableIdent1, ifExists = false, isView = false, purge = 
false))
+      DropTable(tableIdent1, ifExists = false, purge = false))
     parseResolveCompare(s"DROP TABLE IF EXISTS $tableName1",
-      DropTableCommand(tableIdent1, ifExists = true, isView = false, purge = 
false))
+      DropTable(tableIdent1, ifExists = true, purge = false))
     parseResolveCompare(s"DROP TABLE $tableName2",
-      DropTableCommand(tableIdent2, ifExists = false, isView = false, purge = 
false))
+      DropTable(tableIdent2, ifExists = false, purge = false))
     parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2",
-      DropTableCommand(tableIdent2, ifExists = true, isView = false, purge = 
false))
+      DropTable(tableIdent2, ifExists = true, purge = false))
     parseResolveCompare(s"DROP TABLE $tableName2 PURGE",
-      DropTableCommand(tableIdent2, ifExists = false, isView = false, purge = 
true))
+      DropTable(tableIdent2, ifExists = false, purge = true))
     parseResolveCompare(s"DROP TABLE IF EXISTS $tableName2 PURGE",
-      DropTableCommand(tableIdent2, ifExists = true, isView = false, purge = 
true))
+      DropTable(tableIdent2, ifExists = true, purge = true))
   }
 
   test("drop table in v2 catalog") {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
index 8c6d718f18a..e847e92c4ce 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/DropTableSuite.scala
@@ -26,7 +26,7 @@ class DropTableSuite extends v1.DropTableSuiteBase with 
CommandSuiteBase {
   test("hive client calls") {
     withNamespaceAndTable("ns", "tbl") { t =>
       sql(s"CREATE TABLE $t (id int) $defaultUsing")
-      checkHiveClientCalls(expected = 11) {
+      checkHiveClientCalls(expected = 10) {
         sql(s"DROP TABLE $t")
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to