This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 37b39b41d07c [SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable 
should not be changed by falling back to v1 command
37b39b41d07c is described below

commit 37b39b41d07cf8f39dd54cc18342e4d7b8bc71a3
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Mon Sep 9 10:45:14 2024 +0800

    [SPARK-49246][SQL][FOLLOW-UP] The behavior of SaveAsTable should not be 
changed by falling back to v1 command
    
    ### What changes were proposed in this pull request?
    
    This is a followup of https://github.com/apache/spark/pull/47772 . The 
behavior of SaveAsTable should not be changed by switching v1 to v2 command. 
This is similar to https://github.com/apache/spark/pull/47995. For the case of 
`DelegatingCatalogExtension` we need it goes to V1 commands to be consistent 
with previous behavior.
    
    ### Why are the changes needed?
    
    Behavior regression.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    UT
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #48019 from amaliujia/regress_v2.
    
    Lead-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Rui Wang <rui.w...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/internal/DataFrameWriterImpl.scala   |  6 ++++--
 .../test/scala/org/apache/spark/sql/CollationSuite.scala  |  3 ---
 .../DataSourceV2DataFrameSessionCatalogSuite.scala        | 12 ++++++++++--
 .../spark/sql/connector/TestV2SessionCatalogBase.scala    | 15 +++++----------
 4 files changed, 19 insertions(+), 17 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
index 7248a2d3f056..f0eef9ae1cbb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/DataFrameWriterImpl.scala
@@ -426,8 +426,10 @@ final class DataFrameWriterImpl[T] private[sql](ds: 
Dataset[T]) extends DataFram
     import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
 
     val session = df.sparkSession
-    val canUseV2 = lookupV2Provider().isDefined ||
-      
df.sparkSession.sessionState.conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined
+    val canUseV2 = lookupV2Provider().isDefined || 
(df.sparkSession.sessionState.conf.getConf(
+        SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).isDefined &&
+        
!df.sparkSession.sessionState.catalogManager.catalog(CatalogManager.SESSION_CATALOG_NAME)
+          .isInstanceOf[DelegatingCatalogExtension])
 
     session.sessionState.sqlParser.parseMultipartIdentifier(tableName) match {
       case nameParts @ NonSessionCatalogAndIdentifier(catalog, ident) =>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
index 5e7feec149c9..da8aad16f55d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CollationSuite.scala
@@ -34,7 +34,6 @@ import 
org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAg
 import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.internal.{SqlApiConf, SQLConf}
-import org.apache.spark.sql.internal.SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION
 import org.apache.spark.sql.types.{ArrayType, MapType, StringType, 
StructField, StructType}
 
 class CollationSuite extends DatasourceV2SQLBase with AdaptiveSparkPlanHelper {
@@ -158,7 +157,6 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("disable bucketing on collated string column") {
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
     def createTable(bucketColumns: String*): Unit = {
       val tableName = "test_partition_tbl"
       withTable(tableName) {
@@ -760,7 +758,6 @@ class CollationSuite extends DatasourceV2SQLBase with 
AdaptiveSparkPlanHelper {
   }
 
   test("disable partition on collated string column") {
-    spark.conf.unset(V2_SESSION_CATALOG_IMPLEMENTATION.key)
     def createTable(partitionColumns: String*): Unit = {
       val tableName = "test_partition_tbl"
       withTable(tableName) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index 7bbb6485c273..fe078c5ae441 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -55,8 +55,7 @@ class DataSourceV2DataFrameSessionCatalogSuite
     "and a same-name temp view exist") {
     withTable("same_name") {
       withTempView("same_name") {
-        val format = spark.sessionState.conf.defaultDataSourceName
-        sql(s"CREATE TABLE same_name(id LONG) USING $format")
+        sql(s"CREATE TABLE same_name(id LONG) USING $v2Format")
         spark.range(10).createTempView("same_name")
         
spark.range(20).write.format(v2Format).mode(SaveMode.Append).saveAsTable("same_name")
         checkAnswer(spark.table("same_name"), spark.range(10).toDF())
@@ -88,6 +87,15 @@ class DataSourceV2DataFrameSessionCatalogSuite
       assert(tableInfo.properties().get("provider") === v2Format)
     }
   }
+
+  test("SPARK-49246: saveAsTable with v1 format") {
+    withTable("t") {
+      sql("CREATE TABLE t(c INT) USING csv")
+      val df = spark.range(10).toDF()
+      df.write.mode(SaveMode.Overwrite).format("csv").saveAsTable("t")
+      verifyTable("t", df)
+    }
+  }
 }
 
 class InMemoryTableSessionCatalog extends 
TestV2SessionCatalogBase[InMemoryTable] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
index ff944dbb805c..2254abef3fcb 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/TestV2SessionCatalogBase.scala
@@ -22,8 +22,7 @@ import java.util.concurrent.atomic.AtomicBoolean
 
 import scala.jdk.CollectionConverters._
 
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
DelegatingCatalogExtension, Identifier, Table, TableCatalog, V1Table}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Column, 
DelegatingCatalogExtension, Identifier, Table, TableCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.types.StructType
 
@@ -53,14 +52,10 @@ private[connector] trait TestV2SessionCatalogBase[T <: 
Table] extends Delegating
     if (tables.containsKey(ident)) {
       tables.get(ident)
     } else {
-      // Table was created through the built-in catalog
-      super.loadTable(ident) match {
-        case v1Table: V1Table if v1Table.v1Table.tableType == 
CatalogTableType.VIEW => v1Table
-        case t =>
-          val table = newTable(t.name(), t.schema(), t.partitioning(), 
t.properties())
-          addTable(ident, table)
-          table
-      }
+      // Table was created through the built-in catalog via v1 command, this 
is OK as the
+      // `loadTable` should always be invoked, and we set the `tableCreated` 
to pass validation.
+      tableCreated.set(true)
+      super.loadTable(ident)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to