This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2afd1fb [SPARK-33904][SQL] Recognize `spark_catalog` in `saveAsTable()` and `insertInto()` 2afd1fb is described below commit 2afd1fb49243e28152b3e581923b49d3aaab0dd7 Author: Max Gekk <max.g...@gmail.com> AuthorDate: Wed Dec 30 07:56:34 2020 +0000 [SPARK-33904][SQL] Recognize `spark_catalog` in `saveAsTable()` and `insertInto()` ### What changes were proposed in this pull request? In the `saveAsTable()` and `insertInto()` methods of `DataFrameWriter`, recognize `spark_catalog` as the default session catalog in table names. ### Why are the changes needed? 1. To simplify writing of unified v1 and v2 tests 2. To improve Spark SQL user experience. `insertInto()` should have feature parity with the `INSERT INTO` sql command. Currently, `insertInto()` fails on a table from a namespace in `spark_catalog`: ```scala scala> sql("CREATE NAMESPACE spark_catalog.ns") scala> Seq(0).toDF().write.saveAsTable("spark_catalog.ns.tbl") org.apache.spark.sql.AnalysisException: Couldn't find a catalog to handle the identifier spark_catalog.ns.tbl. at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:629) ... 47 elided scala> Seq(0).toDF().write.insertInto("spark_catalog.ns.tbl") org.apache.spark.sql.AnalysisException: Couldn't find a catalog to handle the identifier spark_catalog.ns.tbl. at org.apache.spark.sql.DataFrameWriter.insertInto(DataFrameWriter.scala:498) ... 47 elided ``` but `INSERT INTO` succeed: ```sql spark-sql> create table spark_catalog.ns.tbl (c int); spark-sql> insert into spark_catalog.ns.tbl select 0; spark-sql> select * from spark_catalog.ns.tbl; 0 ``` ### Does this PR introduce _any_ user-facing change? Yes. After the changes for the example above: ```scala scala> Seq(0).toDF().write.saveAsTable("spark_catalog.ns.tbl") scala> Seq(1).toDF().write.insertInto("spark_catalog.ns.tbl") scala> spark.table("spark_catalog.ns.tbl").show(false) +-----+ |value| +-----+ |0 | |1 | +-----+ ``` ### How was this patch tested? By running the affected test suites: ``` $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *.ShowPartitionsSuite" $ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *.FileFormatWriterSuite" ``` Closes #30919 from MaxGekk/insert-into-spark_catalog. Authored-by: Max Gekk <max.g...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/connector/catalog/LookupCatalog.scala | 27 ++++++++++++---------- .../command/ShowPartitionsSuiteBase.scala | 12 +++++++++- .../execution/command/v1/ShowPartitionsSuite.scala | 19 ++++----------- .../execution/command/v2/ShowPartitionsSuite.scala | 22 ++++-------------- .../datasources/FileFormatWriterSuite.scala | 13 +++++++++++ .../execution/command/ShowPartitionsSuite.scala | 19 +++++---------- 6 files changed, 55 insertions(+), 57 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala index d8cdecc..16416fa 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/LookupCatalog.scala @@ -140,19 +140,22 @@ private[sql] trait LookupCatalog extends Logging { * For legacy support only. Please use [[CatalogAndIdentifier]] instead on DSv2 code paths. */ object AsTableIdentifier { - def unapply(parts: Seq[String]): Option[TableIdentifier] = parts match { - case CatalogAndMultipartIdentifier(None, names) + def unapply(parts: Seq[String]): Option[TableIdentifier] = { + def namesToTableIdentifier(names: Seq[String]): Option[TableIdentifier] = names match { + case Seq(name) => Some(TableIdentifier(name)) + case Seq(database, name) => Some(TableIdentifier(name, Some(database))) + case _ => None + } + parts match { + case CatalogAndMultipartIdentifier(None, names) if CatalogV2Util.isSessionCatalog(currentCatalog) => - names match { - case Seq(name) => - Some(TableIdentifier(name)) - case Seq(database, name) => - Some(TableIdentifier(name, Some(database))) - case _ => - None - } - case _ => - None + namesToTableIdentifier(names) + case CatalogAndMultipartIdentifier(Some(catalog), names) + if CatalogV2Util.isSessionCatalog(catalog) && + CatalogV2Util.isSessionCatalog(currentCatalog) => + namesToTableIdentifier(names) + case _ => None + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala index 9a942d3..29edb8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/ShowPartitionsSuiteBase.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.command -import org.apache.spark.sql.{AnalysisException, QueryTest, Row} +import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructType} @@ -53,6 +53,16 @@ trait ShowPartitionsSuiteBase extends QueryTest with DDLCommandTestUtils { sql(s"ALTER TABLE $table ADD PARTITION(year = 2016, month = 3)") } + protected def createNullPartTable(table: String, format: String): Unit = { + import testImplicits._ + val df = Seq((0, ""), (1, null)).toDF("a", "part") + df.write + .partitionBy("part") + .format(format) + .mode(SaveMode.Overwrite) + .saveAsTable(table) + } + test("show partitions of non-partitioned table") { withNamespaceAndTable("ns", "not_partitioned_table") { t => sql(s"CREATE TABLE $t (col1 int) $defaultUsing") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala index 5be5e28..e85d62c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v1/ShowPartitionsSuite.scala @@ -105,22 +105,13 @@ class ShowPartitionsSuite extends ShowPartitionsSuiteBase with CommandSuiteBase } } - test("null and empty string as partition values") { - import testImplicits._ - withTable("t") { - val df = Seq((0, ""), (1, null)).toDF("a", "part") - df.write - .partitionBy("part") - .format("parquet") - .mode(SaveMode.Overwrite) - .saveAsTable("t") - + test("SPARK-33904: null and empty string as partition values") { + withNamespaceAndTable("ns", "tbl") { t => + createNullPartTable(t, "parquet") runShowPartitionsSql( - "SHOW PARTITIONS t", + s"SHOW PARTITIONS $t", Row("part=__HIVE_DEFAULT_PARTITION__") :: Nil) - checkAnswer(spark.table("t"), - Row(0, null) :: - Row(1, null) :: Nil) + checkAnswer(spark.table(t), Row(0, null) :: Row(1, null) :: Nil) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala index 44d8b57..42f05ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowPartitionsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.command.v2 -import org.apache.spark.sql.{AnalysisException, Row, SaveMode} +import org.apache.spark.sql.{AnalysisException, Row} import org.apache.spark.sql.execution.command /** @@ -38,23 +38,11 @@ class ShowPartitionsSuite extends command.ShowPartitionsSuiteBase with CommandSu } } - test("SPARK-33889: null and empty string as partition values") { - import testImplicits._ + test("SPARK-33889, SPARK-33904: null and empty string as partition values") { withNamespaceAndTable("ns", "tbl") { t => - val df = Seq((0, ""), (1, null)).toDF("a", "part") - df.write - .partitionBy("part") - .format("parquet") - .mode(SaveMode.Overwrite) - .saveAsTable(t) - - runShowPartitionsSql( - s"SHOW PARTITIONS $t", - Row("part=") :: - Row("part=null") :: Nil) - checkAnswer(spark.table(t), - Row(0, "") :: - Row(1, null) :: Nil) + createNullPartTable(t, "parquet") + runShowPartitionsSql(s"SHOW PARTITIONS $t", Row("part=") :: Row("part=null") :: Nil) + checkAnswer(spark.table(t), Row(0, "") :: Row(1, null) :: Nil) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala index ce51184..f492fc6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala @@ -61,4 +61,17 @@ class FileFormatWriterSuite checkAnswer(spark.table("t2").sort("id"), Seq(Row(0, null), Row(1, null), Row(2, null))) } } + + test("SPARK-33904: save and insert into a table in a namespace of spark_catalog") { + val ns = "spark_catalog.ns" + withNamespace(ns) { + spark.sql(s"CREATE NAMESPACE $ns") + val t = s"$ns.tbl" + withTable(t) { + spark.range(1).write.saveAsTable(t) + Seq(100).toDF().write.insertInto(t) + checkAnswer(spark.table(t), Seq(Row(0), Row(100))) + } + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala index 904c6c4..ded53cc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowPartitionsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution.command -import org.apache.spark.sql.{Row, SaveMode} +import org.apache.spark.sql.Row import org.apache.spark.sql.execution.command.v1 /** @@ -25,21 +25,14 @@ import org.apache.spark.sql.execution.command.v1 * V1 Hive external table catalog. */ class ShowPartitionsSuite extends v1.ShowPartitionsSuiteBase with CommandSuiteBase { - test("null and empty string as partition values") { - import testImplicits._ + test("SPARK-33904: null and empty string as partition values") { withSQLConf("hive.exec.dynamic.partition.mode" -> "nonstrict") { - withTable("t") { - val df = Seq((0, ""), (1, null)).toDF("a", "part") - df.write - .partitionBy("part") - .format("hive") - .mode(SaveMode.Overwrite) - .saveAsTable("t") - + withNamespaceAndTable("ns", "tbl") { t => + createNullPartTable(t, "hive") runShowPartitionsSql( - "SHOW PARTITIONS t", + s"SHOW PARTITIONS $t", Row("part=__HIVE_DEFAULT_PARTITION__") :: Nil) - checkAnswer(spark.table("t"), + checkAnswer(spark.table(t), Row(0, "__HIVE_DEFAULT_PARTITION__") :: Row(1, "__HIVE_DEFAULT_PARTITION__") :: Nil) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org