This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2afd1fb  [SPARK-33904][SQL] Recognize `spark_catalog` in 
`saveAsTable()` and `insertInto()`
2afd1fb is described below

commit 2afd1fb49243e28152b3e581923b49d3aaab0dd7
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Wed Dec 30 07:56:34 2020 +0000

    [SPARK-33904][SQL] Recognize `spark_catalog` in `saveAsTable()` and 
`insertInto()`
    
    ### What changes were proposed in this pull request?
    In the `saveAsTable()` and `insertInto()` methods of `DataFrameWriter`, 
recognize `spark_catalog` as the default session catalog in table names.
    
    ### Why are the changes needed?
    1. To simplify writing of unified v1 and v2 tests
    2. To improve Spark SQL user experience. `insertInto()` should have feature 
parity with the `INSERT INTO` sql command. Currently, `insertInto()` fails on a 
table from a namespace in `spark_catalog`:
    ```scala
    scala> sql("CREATE NAMESPACE spark_catalog.ns")
    scala> Seq(0).toDF().write.saveAsTable("spark_catalog.ns.tbl")
    org.apache.spark.sql.AnalysisException: Couldn't find a catalog to handle 
the identifier spark_catalog.ns.tbl.
      at 
org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:629)
      ... 47 elided
    scala> Seq(0).toDF().write.insertInto("spark_catalog.ns.tbl")
    org.apache.spark.sql.AnalysisException: Couldn't find a catalog to handle 
the identifier spark_catalog.ns.tbl.
      at 
org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:498)
      ... 47 elided
    ```
    but `INSERT INTO` succeed:
    ```sql
    spark-sql> create table spark_catalog.ns.tbl (c int);
    spark-sql> insert into spark_catalog.ns.tbl select 0;
    spark-sql> select * from spark_catalog.ns.tbl;
    0
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. After the changes for the example above:
    ```scala
    scala> Seq(0).toDF().write.saveAsTable("spark_catalog.ns.tbl")
    scala> Seq(1).toDF().write.insertInto("spark_catalog.ns.tbl")
    scala> spark.table("spark_catalog.ns.tbl").show(false)
    +-----+
    |value|
    +-----+
    |0    |
    |1    |
    +-----+
    ```
    
    ### How was this patch tested?
    By running the affected test suites:
    ```
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly 
*.ShowPartitionsSuite"
    $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly 
*.FileFormatWriterSuite"
    ```
    
    Closes #30919 from MaxGekk/insert-into-spark_catalog.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/connector/catalog/LookupCatalog.scala      | 27 ++++++++++++----------
 .../command/ShowPartitionsSuiteBase.scala          | 12 +++++++++-
 .../execution/command/v1/ShowPartitionsSuite.scala | 19 ++++-----------
 .../execution/command/v2/ShowPartitionsSuite.scala | 22 ++++--------------
 .../datasources/FileFormatWriterSuite.scala        | 13 +++++++++++
 .../execution/command/ShowPartitionsSuite.scala    | 19 +++++----------
 6 files changed, 55 insertions(+), 57 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
index d8cdecc..16416fa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala
@@ -140,19 +140,22 @@ private[sql] trait LookupCatalog extends Logging {
    * For legacy support only. Please use [[CatalogAndIdentifier]] instead on 
DSv2 code paths.
    */
   object AsTableIdentifier {
-    def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match {
-      case CatalogAndMultipartIdentifier(None, names)
+    def unapply(parts: Seq[String]): Option[TableIdentifier] = {
+      def namesToTableIdentifier(names: Seq[String]): Option[TableIdentifier] 
= names match {
+        case Seq(name) => Some(TableIdentifier(name))
+        case Seq(database, name) => Some(TableIdentifier(name, Some(database)))
+        case _ => None
+      }
+      parts match {
+        case CatalogAndMultipartIdentifier(None, names)
           if CatalogV2Util.isSessionCatalog(currentCatalog) =>
-        names match {
-          case Seq(name) =>
-            Some(TableIdentifier(name))
-          case Seq(database, name) =>
-            Some(TableIdentifier(name, Some(database)))
-          case _ =>
-            None
-        }
-      case _ =>
-        None
+          namesToTableIdentifier(names)
+        case CatalogAndMultipartIdentifier(Some(catalog), names)
+          if CatalogV2Util.isSessionCatalog(catalog) &&
+             CatalogV2Util.isSessionCatalog(currentCatalog) =>
+          namesToTableIdentifier(names)
+        case _ => None
+      }
     }
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
index 9a942d3..29edb8f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command
 
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{StringType, StructType}
 
@@ -53,6 +53,16 @@ trait ShowPartitionsSuiteBase extends QueryTest with 
DDLCommandTestUtils {
     sql(s"ALTER TABLE $table ADD PARTITION(year = 2016, month = 3)")
   }
 
+  protected def createNullPartTable(table: String, format: String): Unit = {
+    import testImplicits._
+    val df = Seq((0, ""), (1, null)).toDF("a", "part")
+    df.write
+      .partitionBy("part")
+      .format(format)
+      .mode(SaveMode.Overwrite)
+      .saveAsTable(table)
+  }
+
   test("show partitions of non-partitioned table") {
     withNamespaceAndTable("ns", "not_partitioned_table") { t =>
       sql(s"CREATE TABLE $t (col1 int) $defaultUsing")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
index 5be5e28..e85d62c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala
@@ -105,22 +105,13 @@ class ShowPartitionsSuite extends ShowPartitionsSuiteBase 
with CommandSuiteBase
     }
   }
 
-  test("null and empty string as partition values") {
-    import testImplicits._
-    withTable("t") {
-      val df = Seq((0, ""), (1, null)).toDF("a", "part")
-      df.write
-        .partitionBy("part")
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .saveAsTable("t")
-
+  test("SPARK-33904: null and empty string as partition values") {
+    withNamespaceAndTable("ns", "tbl") { t =>
+      createNullPartTable(t, "parquet")
       runShowPartitionsSql(
-        "SHOW PARTITIONS t",
+        s"SHOW PARTITIONS $t",
         Row("part=__HIVE_DEFAULT_PARTITION__") :: Nil)
-      checkAnswer(spark.table("t"),
-        Row(0, null) ::
-        Row(1, null) :: Nil)
+      checkAnswer(spark.table(t), Row(0, null) :: Row(1, null) :: Nil)
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
index 44d8b57..42f05ee 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.v2
 
-import org.apache.spark.sql.{AnalysisException, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.apache.spark.sql.execution.command
 
 /**
@@ -38,23 +38,11 @@ class ShowPartitionsSuite extends 
command.ShowPartitionsSuiteBase with CommandSu
     }
   }
 
-  test("SPARK-33889: null and empty string as partition values") {
-    import testImplicits._
+  test("SPARK-33889, SPARK-33904: null and empty string as partition values") {
     withNamespaceAndTable("ns", "tbl") { t =>
-      val df = Seq((0, ""), (1, null)).toDF("a", "part")
-      df.write
-        .partitionBy("part")
-        .format("parquet")
-        .mode(SaveMode.Overwrite)
-        .saveAsTable(t)
-
-      runShowPartitionsSql(
-        s"SHOW PARTITIONS $t",
-        Row("part=") ::
-        Row("part=null") :: Nil)
-      checkAnswer(spark.table(t),
-        Row(0, "") ::
-        Row(1, null) :: Nil)
+      createNullPartTable(t, "parquet")
+      runShowPartitionsSql(s"SHOW PARTITIONS $t", Row("part=") :: 
Row("part=null") :: Nil)
+      checkAnswer(spark.table(t), Row(0, "") :: Row(1, null) :: Nil)
     }
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
index ce51184..f492fc6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
@@ -61,4 +61,17 @@ class FileFormatWriterSuite
       checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, 
null), Row(2, null)))
     }
   }
+
+  test("SPARK-33904: save and insert into a table in a namespace of 
spark_catalog") {
+    val ns = "spark_catalog.ns"
+    withNamespace(ns) {
+      spark.sql(s"CREATE NAMESPACE $ns")
+      val t = s"$ns.tbl"
+      withTable(t) {
+        spark.range(1).write.saveAsTable(t)
+        Seq(100).toDF().write.insertInto(t)
+        checkAnswer(spark.table(t), Seq(Row(0), Row(100)))
+      }
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala
index 904c6c4..ded53cc 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution.command
 
-import org.apache.spark.sql.{Row, SaveMode}
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.execution.command.v1
 
 /**
@@ -25,21 +25,14 @@ import org.apache.spark.sql.execution.command.v1
  * V1 Hive external table catalog.
  */
 class ShowPartitionsSuite extends v1.ShowPartitionsSuiteBase with 
CommandSuiteBase {
-  test("null and empty string as partition values") {
-    import testImplicits._
+  test("SPARK-33904: null and empty string as partition values") {
     withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") {
-      withTable("t") {
-        val df = Seq((0, ""), (1, null)).toDF("a", "part")
-        df.write
-          .partitionBy("part")
-          .format("hive")
-          .mode(SaveMode.Overwrite)
-          .saveAsTable("t")
-
+      withNamespaceAndTable("ns", "tbl") { t =>
+        createNullPartTable(t, "hive")
         runShowPartitionsSql(
-          "SHOW PARTITIONS t",
+          s"SHOW PARTITIONS $t",
           Row("part=__HIVE_DEFAULT_PARTITION__") :: Nil)
-        checkAnswer(spark.table("t"),
+        checkAnswer(spark.table(t),
           Row(0, "__HIVE_DEFAULT_PARTITION__") ::
           Row(1, "__HIVE_DEFAULT_PARTITION__") :: Nil)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to