This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 749b1d3 [SPARK-28178][SQL] DataSourceV2: DataFrameWriter.insertInfo 749b1d3 is described below commit 749b1d3a45584554a2fd8c47e232f5095316e10c Author: John Zhuge <jzh...@apache.org> AuthorDate: Tue Jul 30 17:22:33 2019 +0800 [SPARK-28178][SQL] DataSourceV2: DataFrameWriter.insertInfo ## What changes were proposed in this pull request? Support multiple catalogs in the following InsertInto use cases: - DataFrameWriter.insertInto("catalog.db.tbl") Support matrix: SaveMode|Partitioned Table|Partition Overwrite Mode|Action --------|-----------------|------------------------|------ Append|*|*|AppendData Overwrite|no|*|OverwriteByExpression(true) Overwrite|yes|STATIC|OverwriteByExpression(true) Overwrite|yes|DYNAMIC|OverwritePartitionsDynamic ## How was this patch tested? New tests. All existing catalyst and sql/core tests. Closes #24980 from jzhuge/SPARK-28178-pr. Authored-by: John Zhuge <jzh...@apache.org> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../org/apache/spark/sql/DataFrameWriter.scala | 48 ++++++++- .../sources/v2/DataSourceV2DataFrameSuite.scala | 107 +++++++++++++++++++++ 2 files changed, 151 insertions(+), 4 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index b7b1390..549c54f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -22,16 +22,19 @@ import java.util.{Locale, Properties, UUID} import scala.collection.JavaConverters._ import org.apache.spark.annotation.Stable +import org.apache.spark.sql.catalog.v2.{CatalogPlugin, Identifier} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Literal -import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression} +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, InsertIntoTable, LogicalPlan, OverwriteByExpression, OverwritePartitionsDynamic} import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, DataSourceUtils, LogicalRelation} import org.apache.spark.sql.execution.datasources.v2._ +import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister} +import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.TableCapability._ import org.apache.spark.sql.types.StructType @@ -356,10 +359,8 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { * @since 1.4.0 */ def insertInto(tableName: String): Unit = { - insertInto(df.sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)) - } + import df.sparkSession.sessionState.analyzer.{AsTableIdentifier, CatalogObjectIdentifier} - private def insertInto(tableIdent: TableIdentifier): Unit = { assertNotBucketed("insertInto") if (partitioningColumns.isDefined) { @@ -370,6 +371,45 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { ) } + df.sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) match { + case CatalogObjectIdentifier(Some(catalog), ident) => + insertInto(catalog, ident) + case AsTableIdentifier(tableIdentifier) => + insertInto(tableIdentifier) + } + } + + private def insertInto(catalog: CatalogPlugin, ident: Identifier): Unit = { + import org.apache.spark.sql.catalog.v2.CatalogV2Implicits._ + + val table = DataSourceV2Relation.create(catalog.asTableCatalog.loadTable(ident)) + + val command = modeForDSV2 match { + case SaveMode.Append => + AppendData.byName(table, df.logicalPlan) + + case SaveMode.Overwrite => + val conf = df.sparkSession.sessionState.conf + val dynamicPartitionOverwrite = table.table.partitioning.size > 0 && + conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC + + if (dynamicPartitionOverwrite) { + OverwritePartitionsDynamic.byName(table, df.logicalPlan) + } else { + OverwriteByExpression.byName(table, df.logicalPlan, Literal(true)) + } + + case other => + throw new AnalysisException(s"insertInto does not support $other mode, " + + s"please use Append or Overwrite mode instead.") + } + + runCommand(df.sparkSession, "insertInto") { + command + } + } + + private def insertInto(tableIdent: TableIdentifier): Unit = { runCommand(df.sparkSession, "insertInto") { InsertIntoTable( table = UnresolvedRelation(tableIdent), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala new file mode 100644 index 0000000..86735c6 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2DataFrameSuite.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.sources.v2 + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.internal.SQLConf.{PARTITION_OVERWRITE_MODE, PartitionOverwriteMode} +import org.apache.spark.sql.test.SharedSQLContext + +class DataSourceV2DataFrameSuite extends QueryTest with SharedSQLContext with BeforeAndAfter { + import testImplicits._ + + before { + spark.conf.set("spark.sql.catalog.testcat", classOf[TestInMemoryTableCatalog].getName) + spark.conf.set("spark.sql.catalog.testcat2", classOf[TestInMemoryTableCatalog].getName) + + val df = spark.createDataFrame(Seq((1L, "a"), (2L, "b"), (3L, "c"))).toDF("id", "data") + df.createOrReplaceTempView("source") + val df2 = spark.createDataFrame(Seq((4L, "d"), (5L, "e"), (6L, "f"))).toDF("id", "data") + df2.createOrReplaceTempView("source2") + } + + after { + spark.catalog("testcat").asInstanceOf[TestInMemoryTableCatalog].clearTables() + spark.sql("DROP VIEW source") + spark.sql("DROP VIEW source2") + } + + test("insertInto: append") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo") + spark.table("source").select("id", "data").write.insertInto(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("insertInto: append - across catalog") { + val t1 = "testcat.ns1.ns2.tbl" + val t2 = "testcat2.db.tbl" + withTable(t1, t2) { + sql(s"CREATE TABLE $t1 USING foo AS TABLE source") + sql(s"CREATE TABLE $t2 (id bigint, data string) USING foo") + spark.table(t1).write.insertInto(t2) + checkAnswer(spark.table(t2), spark.table("source")) + } + } + + test("insertInto: append partitioned table") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)") + spark.table("source").write.insertInto(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + + test("insertInto: overwrite non-partitioned table") { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 USING foo AS TABLE source") + spark.table("source2").write.mode("overwrite").insertInto(t1) + checkAnswer(spark.table(t1), spark.table("source2")) + } + } + + test("insertInto: overwrite - static mode") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.STATIC.toString) { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)") + Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data").write.insertInto(t1) + spark.table("source").write.mode("overwrite").insertInto(t1) + checkAnswer(spark.table(t1), spark.table("source")) + } + } + } + + test("insertInto: overwrite - dynamic mode") { + withSQLConf(PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) { + val t1 = "testcat.ns1.ns2.tbl" + withTable(t1) { + sql(s"CREATE TABLE $t1 (id bigint, data string) USING foo PARTITIONED BY (id)") + Seq((2L, "dummy"), (4L, "keep")).toDF("id", "data").write.insertInto(t1) + spark.table("source").write.mode("overwrite").insertInto(t1) + checkAnswer(spark.table(t1), + spark.table("source").union(sql("SELECT 4L, 'keep'"))) + } + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org