This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 749b1d3  [SPARK-28178][SQL] DataSourceV2: DataFrameWriter.insertInfo
749b1d3 is described below

commit 749b1d3a45584554a2fd8c47e232f5095316e10c
Author: John Zhuge <jzh...@apache.org>
AuthorDate: Tue Jul 30 17:22:33 2019 +0800

    [SPARK-28178][SQL] DataSourceV2: DataFrameWriter.insertInfo
    
    ## What changes were proposed in this pull request?
    
    Support multiple catalogs in the following InsertInto use cases:
    
    - DataFrameWriter.insertInto("catalog.db.tbl")
    
    Support matrix:
    
    SaveMode|Partitioned Table|Partition Overwrite Mode|Action
    --------|-----------------|------------------------|------
    Append|*|*|AppendData
    Overwrite|no|*|OverwriteByExpression(true)
    Overwrite|yes|STATIC|OverwriteByExpression(true)
    Overwrite|yes|DYNAMIC|OverwritePartitionsDynamic
    
    ## How was this patch tested?
    
    New tests.
    All existing catalyst and sql/core tests.
    
    Closes #24980 from jzhuge/SPARK-28178-pr.
    
    Authored-by: John Zhuge <jzh...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../org/apache/spark/sql/DataFrameWriter.scala     |  48 ++++++++-
 .../sources/v2/DataSourceV2DataFrameSuite.scala    | 107 +++++++++++++++++++++
 2 files changed, 151 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index b7b1390..549c54f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -22,16 +22,19 @@ import java.util.{Locale, Properties, UUID}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.Stable
+import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.Literal
-import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
InsertIntoTable, LogicalPlan, OverwriteByExpression}
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, 
InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic}
 import org.apache.spark.sql.execution.SQLExecution
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, 
DataSourceUtils, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.v2._
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister}
+import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.TableCapability._
 import org.apache.spark.sql.types.StructType
@@ -356,10 +359,8 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
    * @since 1.4.0
    */
   def insertInto(tableName: String): Unit = {
-    
insertInto(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName))
-  }
+    import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, 
CatalogObjectIdentifier}
 
-  private def insertInto(tableIdent: TableIdentifier): Unit = {
     assertNotBucketed("insertInto")
 
     if (partitioningColumns.isDefined) {
@@ -370,6 +371,45 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       )
     }
 
+    df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) 
match {
+      case CatalogObjectIdentifier(Some(catalog), ident) =>
+        insertInto(catalog, ident)
+      case AsTableIdentifier(tableIdentifier) =>
+        insertInto(tableIdentifier)
+    }
+  }
+
+  private def insertInto(catalog: CatalogPlugin, ident: Identifier): Unit = {
+    import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._
+
+    val table = 
DataSourceV2Relation.create(catalog.asTableCatalog.loadTable(ident))
+
+    val command = modeForDSV2 match {
+      case SaveMode.Append =>
+        AppendData.byName(table, df.logicalPlan)
+
+      case SaveMode.Overwrite =>
+        val conf = df.sparkSession.sessionState.conf
+        val dynamicPartitionOverwrite = table.table.partitioning.size > 0 &&
+          conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+
+        if (dynamicPartitionOverwrite) {
+          OverwritePartitionsDynamic.byName(table, df.logicalPlan)
+        } else {
+          OverwriteByExpression.byName(table, df.logicalPlan, Literal(true))
+        }
+
+      case other =>
+        throw new AnalysisException(s"insertInto does not support $other mode, 
" +
+          s"please use Append or Overwrite mode instead.")
+    }
+
+    runCommand(df.sparkSession, "insertInto") {
+      command
+    }
+  }
+
+  private def insertInto(tableIdent: TableIdentifier): Unit = {
     runCommand(df.sparkSession, "insertInto") {
       InsertIntoTable(
         table = UnresolvedRelation(tableIdent),
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala
new file mode 100644
index 0000000..86735c6
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources.v2
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, 
PartitionOverwriteMode}
+import org.apache.spark.sql.test.SharedSQLContext
+
+class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfter {
+  import testImplicits._
+
+  before {
+    spark.conf.set("spark.sql.catalog.testcat", 
classOf[TestInMemoryTableCatalog].getName)
+    spark.conf.set("spark.sql.catalog.testcat2", 
classOf[TestInMemoryTableCatalog].getName)
+
+    val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, 
"c"))).toDF("id", "data")
+    df.createOrReplaceTempView("source")
+    val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, 
"f"))).toDF("id", "data")
+    df2.createOrReplaceTempView("source2")
+  }
+
+  after {
+    
spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables()
+    spark.sql("DROP VIEW source")
+    spark.sql("DROP VIEW source2")
+  }
+
+  test("insertInto: append") {
+    val t1 = "testcat.ns1.ns2.tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo")
+      spark.table("source").select("id", "data").write.insertInto(t1)
+      checkAnswer(spark.table(t1), spark.table("source"))
+    }
+  }
+
+  test("insertInto: append - across catalog") {
+    val t1 = "testcat.ns1.ns2.tbl"
+    val t2 = "testcat2.db.tbl"
+    withTable(t1, t2) {
+      sql(s"CREATE TABLE $t1 USING foo AS TABLE source")
+      sql(s"CREATE TABLE $t2 (id bigint, data string) USING foo")
+      spark.table(t1).write.insertInto(t2)
+      checkAnswer(spark.table(t2), spark.table("source"))
+    }
+  }
+
+  test("insertInto: append partitioned table") {
+    val t1 = "testcat.ns1.ns2.tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY 
(id)")
+      spark.table("source").write.insertInto(t1)
+      checkAnswer(spark.table(t1), spark.table("source"))
+    }
+  }
+
+  test("insertInto: overwrite non-partitioned table") {
+    val t1 = "testcat.ns1.ns2.tbl"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 USING foo AS TABLE source")
+      spark.table("source2").write.mode("overwrite").insertInto(t1)
+      checkAnswer(spark.table(t1), spark.table("source2"))
+    }
+  }
+
+  test("insertInto: overwrite - static mode") {
+    withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.STATIC.toString) {
+      val t1 = "testcat.ns1.ns2.tbl"
+      withTable(t1) {
+        sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED 
BY (id)")
+        Seq((2L, "dummy"), (4L, "keep")).toDF("id", 
"data").write.insertInto(t1)
+        spark.table("source").write.mode("overwrite").insertInto(t1)
+        checkAnswer(spark.table(t1), spark.table("source"))
+      }
+    }
+  }
+
+  test("insertInto: overwrite - dynamic mode") {
+    withSQLConf(PARTITION_OVERWRITE_MODE.key -> 
PartitionOverwriteMode.DYNAMIC.toString) {
+      val t1 = "testcat.ns1.ns2.tbl"
+      withTable(t1) {
+        sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED 
BY (id)")
+        Seq((2L, "dummy"), (4L, "keep")).toDF("id", 
"data").write.insertInto(t1)
+        spark.table("source").write.mode("overwrite").insertInto(t1)
+        checkAnswer(spark.table(t1),
+          spark.table("source").union(sql("SELECT 4L, 'keep'")))
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to