This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db05db9e9d1 [SPARK-49098][SQL] Add write options for INSERT
2db05db9e9d1 is described below

commit 2db05db9e9d11d6463afb249debe8d1aaf5424bc
Author: Szehon Ho <szehon.apa...@gmail.com>
AuthorDate: Tue Aug 6 07:59:20 2024 -0700

    [SPARK-49098][SQL] Add write options for INSERT
    
    ### What changes were proposed in this pull request?
    
    Add `INSERT INTO tbl WITH (k1=v1, k2=v2)`  for INSERT, INSERT OVERWRITE, 
and INSERT... REPLACE WHERE
    
      ### Why are the changes needed?
    
    Follow up for SPARK-36680 which added WITH for SELECT statement
    
      ### Does this PR introduce _any_ user-facing change?
    
    Adds new SQL syntax
      ### How was this patch tested?
    
    New test in DataSourceV2SQLSuite
      ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #47591 from szehon-ho/insert_option.
    
    Authored-by: Szehon Ho <szehon.apa...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  6 +--
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 42 +++++++++------
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 63 +++++++++++++++++++++-
 3 files changed, 92 insertions(+), 19 deletions(-)

diff --git 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 841c94d7868f..d2a7adbb62d5 100644
--- 
a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -413,9 +413,9 @@ query
     ;
 
 insertInto
-    : INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF 
errorCapturingNot EXISTS)?)?  ((BY NAME) | identifierList)? 
#insertOverwriteTable
-    | INSERT INTO TABLE? identifierReference partitionSpec? (IF 
errorCapturingNot EXISTS)? ((BY NAME) | identifierList)?   #insertIntoTable
-    | INSERT INTO TABLE? identifierReference REPLACE whereClause               
                              #insertIntoReplaceWhere
+    : INSERT OVERWRITE TABLE? identifierReference optionsClause? 
(partitionSpec (IF errorCapturingNot EXISTS)?)?  ((BY NAME) | identifierList)? 
#insertOverwriteTable
+    | INSERT INTO TABLE? identifierReference optionsClause? partitionSpec? (IF 
errorCapturingNot EXISTS)? ((BY NAME) | identifierList)?   #insertIntoTable
+    | INSERT INTO TABLE? identifierReference optionsClause? REPLACE 
whereClause                                             #insertIntoReplaceWhere
     | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? 
createFileFormat?                     #insertOverwriteHiveDir
     | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider 
(OPTIONS options=propertyList)? #insertOverwriteDir
     ;
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 21c2179661f7..a505a530b959 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -402,10 +402,11 @@ class AstBuilder extends DataTypeAstBuilder
 
   /**
    * Parameters used for writing query to a table:
-   *   (table ident, tableColumnList, partitionKeys, ifPartitionNotExists, 
byName).
+   *   (table ident, options, tableColumnList, partitionKeys, 
ifPartitionNotExists, byName).
    */
   type InsertTableParams =
-    (IdentifierReferenceContext, Seq[String], Map[String, Option[String]], 
Boolean, Boolean)
+    (IdentifierReferenceContext, Option[OptionsClauseContext], Seq[String],
+      Map[String, Option[String]], Boolean, Boolean)
 
   /**
    * Parameters used for writing query to a directory: (isLocal, 
CatalogStorageFormat, provider).
@@ -432,11 +433,11 @@ class AstBuilder extends DataTypeAstBuilder
       //   2. Write commands do not hold the table logical plan as a child, 
and we need to add
       //      additional resolution code to resolve identifiers inside the 
write commands.
       case table: InsertIntoTableContext =>
-        val (relationCtx, cols, partition, ifPartitionNotExists, byName)
+        val (relationCtx, options, cols, partition, ifPartitionNotExists, 
byName)
         = visitInsertIntoTable(table)
         withIdentClause(relationCtx, ident => {
           InsertIntoStatement(
-            createUnresolvedRelation(relationCtx, ident),
+            createUnresolvedRelation(relationCtx, ident, options),
             partition,
             cols,
             query,
@@ -445,11 +446,11 @@ class AstBuilder extends DataTypeAstBuilder
             byName)
         })
       case table: InsertOverwriteTableContext =>
-        val (relationCtx, cols, partition, ifPartitionNotExists, byName)
+        val (relationCtx, options, cols, partition, ifPartitionNotExists, 
byName)
         = visitInsertOverwriteTable(table)
         withIdentClause(relationCtx, ident => {
           InsertIntoStatement(
-            createUnresolvedRelation(relationCtx, ident),
+            createUnresolvedRelation(relationCtx, ident, options),
             partition,
             cols,
             query,
@@ -460,7 +461,7 @@ class AstBuilder extends DataTypeAstBuilder
       case ctx: InsertIntoReplaceWhereContext =>
         withIdentClause(ctx.identifierReference, ident => {
           OverwriteByExpression.byPosition(
-            createUnresolvedRelation(ctx.identifierReference, ident),
+            createUnresolvedRelation(ctx.identifierReference, ident, 
Option(ctx.optionsClause())),
             query,
             expression(ctx.whereClause().booleanExpression()))
         })
@@ -489,7 +490,8 @@ class AstBuilder extends DataTypeAstBuilder
       invalidStatement("INSERT INTO ... IF NOT EXISTS", ctx)
     }
 
-    (ctx.identifierReference, cols, partitionKeys, false, ctx.NAME() != null)
+    (ctx.identifierReference, Option(ctx.optionsClause()), cols, 
partitionKeys, false,
+      ctx.NAME() != null)
   }
 
   /**
@@ -509,7 +511,8 @@ class AstBuilder extends DataTypeAstBuilder
         dynamicPartitionKeys.keys.mkString(", "), ctx)
     }
 
-    (ctx.identifierReference, cols, partitionKeys, ctx.EXISTS() != null, 
ctx.NAME() != null)
+    (ctx.identifierReference, Option(ctx.optionsClause()), cols, partitionKeys,
+      ctx.EXISTS() != null, ctx.NAME() != null)
   }
 
   /**
@@ -3093,9 +3096,7 @@ class AstBuilder extends DataTypeAstBuilder
   private def createUnresolvedRelation(
       ctx: IdentifierReferenceContext,
       optionsClause: Option[OptionsClauseContext] = None): LogicalPlan = 
withOrigin(ctx) {
-    val options = optionsClause.map{ clause =>
-      new 
CaseInsensitiveStringMap(visitPropertyKeyValues(clause.options).asJava)
-    }.getOrElse(CaseInsensitiveStringMap.empty)
+    val options = resolveOptions(optionsClause)
     withIdentClause(ctx, parts =>
       new UnresolvedRelation(parts, options, isStreaming = false))
   }
@@ -3104,8 +3105,18 @@ class AstBuilder extends DataTypeAstBuilder
    * Create an [[UnresolvedRelation]] from a multi-part identifier.
    */
   private def createUnresolvedRelation(
-      ctx: ParserRuleContext, ident: Seq[String]): UnresolvedRelation = 
withOrigin(ctx) {
-    UnresolvedRelation(ident)
+      ctx: ParserRuleContext,
+      ident: Seq[String],
+      optionsClause: Option[OptionsClauseContext]): UnresolvedRelation = 
withOrigin(ctx) {
+    val options = resolveOptions(optionsClause)
+    new UnresolvedRelation(ident, options, isStreaming = false)
+  }
+
+  private def resolveOptions(
+      optionsClause: Option[OptionsClauseContext]): CaseInsensitiveStringMap = 
{
+    optionsClause.map{ clause =>
+      new 
CaseInsensitiveStringMap(visitPropertyKeyValues(clause.options).asJava)
+    }.getOrElse(CaseInsensitiveStringMap.empty)
   }
 
   /**
@@ -4974,7 +4985,8 @@ class AstBuilder extends DataTypeAstBuilder
       if (query.isDefined) {
         CacheTableAsSelect(ident.head, query.get, source(ctx.query()), isLazy, 
options)
       } else {
-        CacheTable(createUnresolvedRelation(ctx.identifierReference, ident), 
ident, isLazy, options)
+        CacheTable(createUnresolvedRelation(ctx.identifierReference, ident, 
None),
+          ident, isLazy, options)
       }
     })
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index c032de4bc260..ec4b827c659f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.catalyst.CurrentUserContext.CURRENT_USER
 import org.apache.spark.sql.catalyst.InternalRow
 import 
org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, 
NoSuchNamespaceException, TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.ParseException
-import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.plans.logical.{AppendData, ColumnStat, 
CommandResult, OverwriteByExpression}
 import org.apache.spark.sql.catalyst.statsEstimation.StatsEstimationTestBase
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.catalog.{Column => ColumnV2, _}
@@ -41,6 +41,7 @@ import org.apache.spark.sql.errors.QueryErrorsBase
 import org.apache.spark.sql.execution.FilterExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation
 import org.apache.spark.sql.execution.streaming.MemoryStream
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -3551,6 +3552,66 @@ class DataSourceV2SQLSuiteV1Filter
     }
   }
 
+  test("SPARK-36680: Supports Dynamic Table Options for Insert") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+      val df = sql(s"INSERT INTO $t1 WITH (`write.split-size` = 10) VALUES (1, 
'a'), (2, 'b')")
+
+      val collected = df.queryExecution.optimizedPlan.collect {
+        case CommandResult(_, AppendData(relation: DataSourceV2Relation, _, _, 
_, _, _), _, _) =>
+          assert(relation.options.get("write.split-size") == "10")
+      }
+      assert (collected.size == 1)
+
+      val insertResult = sql(s"SELECT * FROM $t1")
+      checkAnswer(insertResult, Seq(Row(1, "a"), Row(2, "b")))
+    }
+  }
+
+  test("SPARK-36680: Supports Dynamic Table Options for Insert Overwrite") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+      sql(s"INSERT INTO $t1 WITH (`write.split-size` = 10) VALUES (1, 'a'), 
(2, 'b')")
+
+      val df = sql(s"INSERT OVERWRITE $t1 WITH (`write.split-size` = 10) " +
+        s"VALUES (3, 'c'), (4, 'd')")
+      val collected = df.queryExecution.optimizedPlan.collect {
+        case CommandResult(_,
+          OverwriteByExpression(relation: DataSourceV2Relation, _, _, _, _, _, 
_),
+          _, _) =>
+          assert(relation.options.get("write.split-size") == "10")
+      }
+      assert (collected.size == 1)
+
+      val insertResult = sql(s"SELECT * FROM $t1")
+      checkAnswer(insertResult, Seq(Row(3, "c"), Row(4, "d")))
+    }
+  }
+
+  test("SPARK-36680: Supports Dynamic Table Options for Insert Replace") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format")
+      sql(s"INSERT INTO $t1 WITH (`write.split-size` = 10) VALUES (1, 'a'), 
(2, 'b')")
+
+      val df = sql(s"INSERT INTO $t1 WITH (`write.split-size` = 10) " +
+        s"REPLACE WHERE TRUE " +
+        s"VALUES (3, 'c'), (4, 'd')")
+      val collected = df.queryExecution.optimizedPlan.collect {
+        case CommandResult(_,
+          OverwriteByExpression(relation: DataSourceV2Relation, _, _, _, _, _, 
_),
+          _, _) =>
+          assert(relation.options.get("write.split-size") == "10")
+      }
+      assert (collected.size == 1)
+
+      val insertResult = sql(s"SELECT * FROM $t1")
+      checkAnswer(insertResult, Seq(Row(3, "c"), Row(4, "d")))
+    }
+  }
+
   private def testNotSupportedV2Command(
       sqlCommand: String,
       sqlParams: String,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to