This is an automated email from the ASF dual-hosted git repository.

huaxingao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new feba5ac  [SPARK-37545][SQL] V2 CreateTableAsSelect command should 
qualify location
feba5ac is described below

commit feba5ac32f2598f6ca8a274850934106be0db64d
Author: Terry Kim <yumin...@gmail.com>
AuthorDate: Sat Dec 4 20:47:45 2021 -0800

    [SPARK-37545][SQL] V2 CreateTableAsSelect command should qualify location
    
    ### What changes were proposed in this pull request?
    
    Currently, v2 CTAS command doesn't qualify the location:
    ```
    spark.sql("CREATE TABLE testcat.t USING foo LOCATION '/tmp/foo' AS SELECT 
id FROM source")
    spark.sql("DESCRIBE EXTENDED testcat.t").filter("col_name = 
'Location'").show
    
    +--------+-------------+-------+
    |col_name|    data_type|comment|
    +--------+-------------+-------+
    |Location|/tmp/foo     |       |
    +--------+-------------+-------+
    ```
    , whereas v1 command qualifies the location as `file:/tmp/foo` which is the 
correct behavior since the default filesystem can change for different sessions.
    
    ### Why are the changes needed?
    
    This PR proposes to store the qualified location in order to prevent the 
issue where default filesystem changes for different sessions.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, now, v2 CTAS command will store qualified location:
    ```
    +--------+-------------+-------+
    |col_name|    data_type|comment|
    +--------+-------------+-------+
    |Location|file:/tmp/foo|       |
    +--------+-------------+-------+
    ```
    
    ### How was this patch tested?
    
    Added new test
    
    Closes #34806 from imback82/v2_ctas_qualified_loc.
    
    Authored-by: Terry Kim <yumin...@gmail.com>
    Signed-off-by: Huaxin Gao <huaxin_...@apple.com>
---
 .../execution/datasources/v2/DataSourceV2Strategy.scala   |  6 ++++--
 .../DataSourceV2DataFrameSessionCatalogSuite.scala        |  4 ++--
 .../apache/spark/sql/connector/DataSourceV2SQLSuite.scala | 15 +++++++++++++++
 3 files changed, 21 insertions(+), 4 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index f73b1a6..dbe4168 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -172,13 +172,15 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     case CreateTableAsSelect(ResolvedDBObjectName(catalog, ident), parts, 
query, tableSpec,
         options, ifNotExists) =>
       val writeOptions = new CaseInsensitiveStringMap(options.asJava)
+      val tableSpecWithQualifiedLocation = tableSpec.copy(
+        location = tableSpec.location.map(makeQualifiedDBObjectPath(_)))
       catalog match {
         case staging: StagingTableCatalog =>
           AtomicCreateTableAsSelectExec(staging, ident.asIdentifier, parts, 
query, planLater(query),
-            tableSpec, writeOptions, ifNotExists) :: Nil
+            tableSpecWithQualifiedLocation, writeOptions, ifNotExists) :: Nil
         case _ =>
           CreateTableAsSelectExec(catalog.asTableCatalog, ident.asIdentifier, 
parts, query,
-            planLater(query), tableSpec, writeOptions, ifNotExists) :: Nil
+            planLater(query), tableSpecWithQualifiedLocation, writeOptions, 
ifNotExists) :: Nil
       }
 
     case RefreshTable(r: ResolvedTable) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
index 91ac7db..3edc4b9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2DataFrameSessionCatalogSuite.scala
@@ -83,10 +83,10 @@ class DataSourceV2DataFrameSessionCatalogSuite
   test("saveAsTable passes path and provider information properly") {
     val t1 = "prop_table"
     withTable(t1) {
-      spark.range(20).write.format(v2Format).option("path", 
"abc").saveAsTable(t1)
+      spark.range(20).write.format(v2Format).option("path", 
"/abc").saveAsTable(t1)
       val cat = 
spark.sessionState.catalogManager.currentCatalog.asInstanceOf[TableCatalog]
       val tableInfo = cat.loadTable(Identifier.of(Array("default"), t1))
-      assert(tableInfo.properties().get("location") === "abc")
+      assert(tableInfo.properties().get("location") === "file:/abc")
       assert(tableInfo.properties().get("provider") === v2Format)
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 949abfe..6fe8e10 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -408,6 +408,21 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-37545: CreateTableAsSelect should store location as qualified") {
+    val basicIdentifier = "testcat.table_name"
+    val atomicIdentifier = "testcat_atomic.table_name"
+    Seq(basicIdentifier, atomicIdentifier).foreach { identifier =>
+      withTable(identifier) {
+        spark.sql(s"CREATE TABLE $identifier USING foo LOCATION '/tmp/foo' " +
+          "AS SELECT id FROM source")
+        val location = spark.sql(s"DESCRIBE EXTENDED $identifier")
+          .filter("col_name = 'Location'")
+          .select("data_type").head.getString(0)
+        assert(location === "file:/tmp/foo")
+      }
+    }
+  }
+
   test("ReplaceTableAsSelect: basic v2 implementation.") {
     val basicCatalog = catalog("testcat").asTableCatalog
     val atomicCatalog = catalog("testcat_atomic").asTableCatalog

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to