This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e6adc67d43d [SPARK-42750][SQL] Support Insert By Name statement
e6adc67d43d is described below

commit e6adc67d43d6beccf21013ee00aa274bed13107c
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Wed Jun 7 10:30:59 2023 +0800

    [SPARK-42750][SQL] Support Insert By Name statement
    
    ### What changes were proposed in this pull request?
    
    In some use cases, users have incoming dataframes with fixed column names 
which might differ from the canonical order. Currently there's no way to handle 
this easily through the INSERT INTO API - the user has to make sure the columns 
are in the right order as they would when inserting a tuple. We should add an 
optional BY NAME clause, such that:
    
    `INSERT INTO tgt BY NAME <query>`
    
    takes each column of <query> and inserts it into the column in `tgt` which 
has the same name according to the configured `resolver` logic.
    
    Some definitions need to be clarified:
    1. `BY NAME` and specified column insertion (`INSERT INTO t1 (a,b)`... ) is 
a mutually exclusive operation
    2. But it supports to define partition while using `BY NAME`: `INSERT INTO 
t PARTITION(a=1) BY NAME <query>`
    
    At now don't support `INSERT OVERWRITE BY NAME` (I will supported in follow 
up)
    
    ### Why are the changes needed?
    Add new feature `INSERT INTO BY NAME`
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add new test.
    
    Closes #40908 from Hisoka-X/SPARK-42750_insert_into_by_name.
    
    Lead-authored-by: Jia Fan <fanjiaemi...@qq.com>
    Co-authored-by: Hisoka <fanjiaemi...@qq.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 docs/sql-ref-ansi-compliance.md                    |  3 +-
 .../spark/sql/catalyst/parser/SqlBaseLexer.g4      |  1 +
 .../spark/sql/catalyst/parser/SqlBaseParser.g4     |  4 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  7 ++--
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 19 ++++++----
 .../sql/catalyst/plans/logical/statements.scala    |  7 +++-
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 34 +++++++++++++++++
 .../execution/datasources/DataSourceStrategy.scala |  9 +++--
 .../datasources/FallBackFileSourceV2.scala         |  2 +-
 .../spark/sql/execution/datasources/rules.scala    | 14 ++++---
 .../sql-tests/analyzer-results/explain-aqe.sql.out |  2 +-
 .../sql-tests/analyzer-results/explain.sql.out     |  2 +-
 .../sql-tests/results/ansi/keywords.sql.out        |  1 +
 .../sql-tests/results/explain-aqe.sql.out          |  2 +-
 .../resources/sql-tests/results/explain.sql.out    |  2 +-
 .../resources/sql-tests/results/keywords.sql.out   |  1 +
 .../org/apache/spark/sql/SQLInsertTestSuite.scala  | 43 ++++++++++++++++++++--
 .../execution/command/PlanResolutionSuite.scala    |  4 +-
 .../ThriftServerWithSparkContextSuite.scala        |  2 +-
 .../org/apache/spark/sql/hive/HiveStrategies.scala |  8 ++--
 21 files changed, 129 insertions(+), 40 deletions(-)

diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md
index 76b5d5aef73..f9c6f5ea6aa 100644
--- a/docs/sql-ref-ansi-compliance.md
+++ b/docs/sql-ref-ansi-compliance.md
@@ -350,7 +350,7 @@ By default, both `spark.sql.ansi.enabled` and 
`spark.sql.ansi.enforceReservedKey
 Below is a list of all the keywords in Spark SQL.
 
 |Keyword|Spark SQL<br/>ANSI Mode|Spark SQL<br/>Default Mode|SQL-2016|
-|-------|----------------------|-------------------------|--------|
+|------|----------------------|-------------------------|--------|
 |ADD|non-reserved|non-reserved|non-reserved|
 |AFTER|non-reserved|non-reserved|non-reserved|
 |ALL|reserved|non-reserved|reserved|
@@ -527,6 +527,7 @@ Below is a list of all the keywords in Spark SQL.
 |MONTH|non-reserved|non-reserved|non-reserved|
 |MONTHS|non-reserved|non-reserved|non-reserved|
 |MSCK|non-reserved|non-reserved|non-reserved|
+|NAME|non-reserved|non-reserved|non-reserved|
 |NAMESPACE|non-reserved|non-reserved|non-reserved|
 |NAMESPACES|non-reserved|non-reserved|non-reserved|
 |NANOSECOND|non-reserved|non-reserved|non-reserved|
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
index 6300221b542..ecd5f5912fd 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4
@@ -264,6 +264,7 @@ MINUTES: 'MINUTES';
 MONTH: 'MONTH';
 MONTHS: 'MONTHS';
 MSCK: 'MSCK';
+NAME: 'NAME';
 NAMESPACE: 'NAMESPACE';
 NAMESPACES: 'NAMESPACES';
 NANOSECOND: 'NANOSECOND';
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
index 20c8df4f79a..89100f2aeec 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4
@@ -318,7 +318,7 @@ query
 
 insertInto
     : INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF NOT 
EXISTS)?)?  identifierList?         #insertOverwriteTable
-    | INSERT INTO TABLE? identifierReference partitionSpec? (IF NOT EXISTS)? 
identifierList?                 #insertIntoTable
+    | INSERT INTO TABLE? identifierReference partitionSpec? (IF NOT EXISTS)? 
((BY NAME) | identifierList)?   #insertIntoTable
     | INSERT INTO TABLE? identifierReference REPLACE whereClause               
                              #insertIntoReplaceWhere
     | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? 
createFileFormat?                     #insertOverwriteHiveDir
     | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider 
(OPTIONS options=propertyList)? #insertOverwriteDir
@@ -1362,6 +1362,7 @@ ansiNonReserved
     | MONTH
     | MONTHS
     | MSCK
+    | NAME
     | NAMESPACE
     | NAMESPACES
     | NANOSECOND
@@ -1683,6 +1684,7 @@ nonReserved
     | MONTH
     | MONTHS
     | MSCK
+    | NAME
     | NAMESPACE
     | NAMESPACES
     | NANOSECOND
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 51b35498526..aa1b9d0e8fd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1078,7 +1078,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
 
     def apply(plan: LogicalPlan)
         : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, 
ruleId) {
-      case i @ InsertIntoStatement(table, _, _, _, _, _) =>
+      case i @ InsertIntoStatement(table, _, _, _, _, _, _) =>
         val relation = table match {
           case u: UnresolvedRelation if !u.isStreaming =>
             resolveRelation(u).getOrElse(u)
@@ -1278,7 +1278,8 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
   object ResolveInsertInto extends ResolveInsertionBase {
     override def apply(plan: LogicalPlan): LogicalPlan = 
plan.resolveOperatorsWithPruning(
       AlwaysProcess.fn, ruleId) {
-      case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _) if 
i.query.resolved =>
+      case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _)
+          if i.query.resolved =>
         // ifPartitionNotExists is append with validation, but validation is 
not supported
         if (i.ifPartitionNotExists) {
           throw 
QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name)
@@ -1290,7 +1291,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         } else {
           None
         }
-        val isByName = projectByName.nonEmpty
+        val isByName = projectByName.nonEmpty || i.byName
 
         val partCols = partitionColumnNames(r.table)
         validatePartitionSpec(partCols, i.partitionSpec)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 9124890d4af..e84023ec3df 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -166,7 +166,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
     // not found first, instead of errors in the input query of the insert 
command, by doing a
     // top-down traversal.
     plan.foreach {
-      case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) =>
+      case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) =>
         u.tableNotFound(u.multipartIdentifier)
 
       // TODO (SPARK-27484): handle streaming write commands when we have them.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 0801cfbda4b..f4170860c24 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -277,10 +277,10 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
 
   /**
    * Parameters used for writing query to a table:
-   *   (table ident, tableColumnList, partitionKeys, ifPartitionNotExists).
+   *   (table ident, tableColumnList, partitionKeys, ifPartitionNotExists, 
byName).
    */
   type InsertTableParams =
-    (IdentifierReferenceContext, Seq[String], Map[String, Option[String]], 
Boolean)
+    (IdentifierReferenceContext, Seq[String], Map[String, Option[String]], 
Boolean, Boolean)
 
   /**
    * Parameters used for writing query to a directory: (isLocal, 
CatalogStorageFormat, provider).
@@ -291,7 +291,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
    * Add an
    * {{{
    *   INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? 
[identifierList]
-   *   INSERT INTO [TABLE] tableIdentifier [partitionSpec]  [identifierList]
+   *   INSERT INTO [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | 
[identifierList])
    *   INSERT INTO [TABLE] tableIdentifier REPLACE whereClause
    *   INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat]
    *   INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS 
tablePropertyList]
@@ -307,7 +307,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
       //   2. Write commands do not hold the table logical plan as a child, 
and we need to add
       //      additional resolution code to resolve identifiers inside the 
write commands.
       case table: InsertIntoTableContext =>
-        val (relationCtx, cols, partition, ifPartitionNotExists) = 
visitInsertIntoTable(table)
+        val (relationCtx, cols, partition, ifPartitionNotExists, byName)
+        = visitInsertIntoTable(table)
         withIdentClause(relationCtx, ident => {
           InsertIntoStatement(
             createUnresolvedRelation(relationCtx, ident),
@@ -315,10 +316,12 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
             cols,
             query,
             overwrite = false,
-            ifPartitionNotExists)
+            ifPartitionNotExists,
+            byName)
         })
       case table: InsertOverwriteTableContext =>
-        val (relationCtx, cols, partition, ifPartitionNotExists) = 
visitInsertOverwriteTable(table)
+        val (relationCtx, cols, partition, ifPartitionNotExists, _)
+        = visitInsertOverwriteTable(table)
         withIdentClause(relationCtx, ident => {
           InsertIntoStatement(
             createUnresolvedRelation(relationCtx, ident),
@@ -358,7 +361,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
       operationNotAllowed("INSERT INTO ... IF NOT EXISTS", ctx)
     }
 
-    (ctx.identifierReference, cols, partitionKeys, false)
+    (ctx.identifierReference, cols, partitionKeys, false, ctx.NAME() != null)
   }
 
   /**
@@ -376,7 +379,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
         dynamicPartitionKeys.keys.mkString(", "), ctx)
     }
 
-    (ctx.identifierReference, cols, partitionKeys, ctx.EXISTS() != null)
+    (ctx.identifierReference, cols, partitionKeys, ctx.EXISTS() != null, false)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
index 9c639a4bce6..669750ee448 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala
@@ -165,6 +165,8 @@ case class QualifiedColType(
  *                             would have Map('a' -> Some('1'), 'b' -> None).
  * @param ifPartitionNotExists If true, only write if the partition does not 
exist.
  *                             Only valid for static partitions.
+ * @param byName               If true, reorder the data columns to match the 
column names of the
+ *                             target table.
  */
 case class InsertIntoStatement(
     table: LogicalPlan,
@@ -172,12 +174,15 @@ case class InsertIntoStatement(
     userSpecifiedCols: Seq[String],
     query: LogicalPlan,
     overwrite: Boolean,
-    ifPartitionNotExists: Boolean) extends UnaryParsedStatement {
+    ifPartitionNotExists: Boolean,
+    byName: Boolean = false) extends UnaryParsedStatement {
 
   require(overwrite || !ifPartitionNotExists,
     "IF NOT EXISTS is only valid in INSERT OVERWRITE")
   require(partitionSpec.values.forall(_.nonEmpty) || !ifPartitionNotExists,
     "IF NOT EXISTS is only valid with static partitions")
+  require(userSpecifiedCols.isEmpty || !byName,
+    "BY NAME is only valid without specified cols")
 
   override def child: LogicalPlan = query
   override protected def withNewChildInternal(newChild: LogicalPlan): 
InsertIntoStatement =
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index 5899f813f14..53635acf0b3 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -1673,6 +1673,40 @@ class DDLParserSuite extends AnalysisTest {
         stop = 69))
   }
 
+  test("insert table: by name") {
+    Seq(
+      "INSERT INTO TABLE testcat.ns1.ns2.tbl BY NAME SELECT * FROM source",
+      "INSERT INTO testcat.ns1.ns2.tbl BY NAME SELECT * FROM source"
+    ).foreach { sql =>
+      parseCompare(sql,
+        InsertIntoStatement(
+          UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")),
+          Map.empty,
+          Nil,
+          Project(Seq(UnresolvedStar(None)), 
UnresolvedRelation(Seq("source"))),
+          overwrite = false, ifPartitionNotExists = false, byName = true))
+    }
+  }
+
+  test("insert table: by name unsupported case") {
+    checkError(
+      exception = parseException("INSERT OVERWRITE TABLE t1 BY NAME SELECT * 
FROM tmp_view"),
+      errorClass = "PARSE_SYNTAX_ERROR",
+      parameters = Map(
+        "error" -> "'BY'",
+        "hint" -> "")
+    )
+
+    checkError(
+      exception = parseException(
+        "INSERT INTO TABLE t1 BY NAME (c1,c2) SELECT * FROM tmp_view"),
+      errorClass = "PARSE_SYNTAX_ERROR",
+      parameters = Map(
+        "error" -> "'c1'",
+        "hint" -> "")
+    )
+  }
+
   test("delete from table: delete all") {
     parseCompare("DELETE FROM testcat.ns1.ns2.tbl",
       DeleteFromTable(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 818dc4eb31c..454cc0b5f56 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -152,7 +152,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
       CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, 
query.output.map(_.name))
 
     case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, 
_),
-        parts, _, query, overwrite, false) if parts.isEmpty =>
+        parts, _, query, overwrite, false, _) if parts.isEmpty =>
       InsertIntoDataSourceCommand(l, query, overwrite)
 
     case InsertIntoDir(_, storage, provider, query, overwrite)
@@ -164,7 +164,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] {
       InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite)
 
     case i @ InsertIntoStatement(
-        l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, 
query, overwrite, _)
+        l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, 
query, overwrite, _, _)
         if query.resolved =>
       // If the InsertIntoTable command is for a partitioned HadoopFsRelation 
and
       // the user has specified static partitions, we add a Project operator 
on top of the query
@@ -275,10 +275,11 @@ class FindDataSourceTable(sparkSession: SparkSession) 
extends Rule[LogicalPlan]
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, 
false),
-        _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
+        _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) =>
       i.copy(table = readDataSourceTable(tableMeta, options))
 
-    case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, 
false), _, _, _, _, _) =>
+    case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, 
false),
+        _, _, _, _, _, _) =>
       i.copy(table = DDLUtils.readHiveTable(tableMeta))
 
     case UnresolvedCatalogRelation(tableMeta, options, false)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
index b5d06db0241..2e1ae9fe3ae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala
@@ -35,7 +35,7 @@ import 
org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File
 class FallBackFileSourceV2(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case i @ InsertIntoStatement(
-        d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _) 
=>
+        d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _, 
_) =>
       val v1FileFormat = table.fallbackFileFormat.newInstance()
       val relation = HadoopFsRelation(
         table.fileIndex,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index b3fdfc76c7d..0b07ae1d11c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -389,15 +389,16 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
     }
 
     // Create a project if this INSERT has a user-specified column list.
-    val isByName = insert.userSpecifiedCols.nonEmpty
-    val query = if (isByName) {
+    val hasColumnList = insert.userSpecifiedCols.nonEmpty
+    val query = if (hasColumnList) {
       createProjectForByNameQuery(insert)
     } else {
       insert.query
     }
     val newQuery = try {
       TableOutputResolver.resolveOutputColumns(
-        tblName, expectedColumns, query, byName = isByName, conf, 
supportColDefaultValue = true)
+        tblName, expectedColumns, query, byName = hasColumnList || 
insert.byName, conf,
+        supportColDefaultValue = true)
     } catch {
       case e: AnalysisException if staticPartCols.nonEmpty &&
           e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH" =>
@@ -425,7 +426,7 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
   }
 
   def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-    case i @ InsertIntoStatement(table, _, _, query, _, _) if table.resolved 
&& query.resolved =>
+    case i @ InsertIntoStatement(table, _, _, query, _, _, _) if 
table.resolved && query.resolved =>
       table match {
         case relation: HiveTableRelation =>
           val metadata = relation.tableMeta
@@ -506,7 +507,8 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
 
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), 
partition, _, query, _, _) =>
+      case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), 
partition,
+          _, query, _, _, _) =>
         // Get all input data source relations of the query.
         val srcRelations = query.collect {
           case LogicalRelation(src, _, _, _) => src
@@ -528,7 +530,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) {
           case _ => failAnalysis(s"$relation does not allow insertion.")
         }
 
-      case InsertIntoStatement(t, _, _, _, _, _)
+      case InsertIntoStatement(t, _, _, _, _, _, _)
         if !t.isInstanceOf[LeafNode] ||
           t.isInstanceOf[Range] ||
           t.isInstanceOf[OneRowRelation] ||
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
index 8189f1fc7d1..c53642b8ba2 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out
@@ -196,7 +196,7 @@ ExplainCommand 'Aggregate ['key], ['key, 
unresolvedalias('MIN('val), None)], For
 -- !query
 EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
 -- !query analysis
-ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], 
false, false, false, ExtendedMode
+ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], 
false, false, false, false, ExtendedMode
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
index 8189f1fc7d1..c53642b8ba2 100644
--- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out
@@ -196,7 +196,7 @@ ExplainCommand 'Aggregate ['key], ['key, 
unresolvedalias('MIN('val), None)], For
 -- !query
 EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4
 -- !query analysis
-ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], 
false, false, false, ExtendedMode
+ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], 
false, false, false, false, ExtendedMode
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
index 9952c9aef62..34cc6cc4bd9 100644
--- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out
@@ -180,6 +180,7 @@ MINUTES     false
 MONTH  false
 MONTHS false
 MSCK   false
+NAME   false
 NAMESPACE      false
 NAMESPACES     false
 NANOSECOND     false
diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
index d73035aa527..3c2677c936f 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out
@@ -1081,7 +1081,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * 
FROM explain_temp4
 struct<plan:string>
 -- !query output
 == Parsed Logical Plan ==
-'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, 
false
+'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, 
false, false
 +- 'Project [*]
    +- 'UnresolvedRelation [explain_temp4], [], false
 
diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out 
b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
index 5ac793fed86..f54c6c5e44f 100644
--- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out
@@ -1023,7 +1023,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * 
FROM explain_temp4
 struct<plan:string>
 -- !query output
 == Parsed Logical Plan ==
-'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, 
false
+'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, 
false, false
 +- 'Project [*]
    +- 'UnresolvedRelation [explain_temp4], [], false
 
diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out 
b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
index aa13b029300..41d491c8027 100644
--- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out
@@ -180,6 +180,7 @@ MINUTES     false
 MONTH  false
 MONTHS false
 MSCK   false
+NAME   false
 NAMESPACE      false
 NAMESPACES     false
 NANOSECOND     false
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
index af85e44519b..1d27904bb2c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala
@@ -51,17 +51,20 @@ trait SQLInsertTestSuite extends QueryTest with 
SQLTestUtils {
       input: DataFrame,
       cols: Seq[String] = Nil,
       partitionExprs: Seq[String] = Nil,
-      overwrite: Boolean): Unit = {
+      overwrite: Boolean,
+      byName: Boolean = false): Unit = {
     val tmpView = "tmp_view"
-    val columnList = if (cols.nonEmpty) cols.mkString("(", ",", ")") else ""
     val partitionList = if (partitionExprs.nonEmpty) {
       partitionExprs.mkString("PARTITION (", ",", ")")
     } else ""
     withTempView(tmpView) {
       input.createOrReplaceTempView(tmpView)
       val overwriteStr = if (overwrite) "OVERWRITE" else "INTO"
+      val columnList = if (cols.nonEmpty && !byName) cols.mkString("(", ",", 
")") else ""
+      val byNameStr = if (byName) "BY NAME" else ""
       sql(
-        s"INSERT $overwriteStr TABLE $tableName $partitionList $columnList 
SELECT * FROM $tmpView")
+        s"INSERT $overwriteStr TABLE $tableName $partitionList $byNameStr " +
+          s"$columnList SELECT * FROM $tmpView")
     }
   }
 
@@ -123,6 +126,40 @@ trait SQLInsertTestSuite extends QueryTest with 
SQLTestUtils {
     }
   }
 
+  test("insert with column list - by name") {
+    withTable("t1") {
+      val cols = Seq("c1", "c2", "c3")
+      val df = Seq((3, 2, 1)).toDF(cols.reverse: _*)
+      createTable("t1", cols, Seq("int", "int", "int"))
+      processInsert("t1", df, overwrite = false, byName = true)
+      verifyTable("t1", df.selectExpr(cols: _*))
+    }
+  }
+
+  test("insert with column list - by name + partitioned table") {
+    val cols = Seq("c1", "c2", "c3", "c4")
+    val df = Seq((4, 3, 2, 1)).toDF(cols.reverse: _*)
+    withTable("t1") {
+      createTable("t1", cols, Seq("int", "int", "int", "int"), 
cols.takeRight(2))
+      processInsert("t1", df, overwrite = false, byName = true)
+      verifyTable("t1", df.selectExpr(cols: _*))
+    }
+
+    withTable("t1") {
+      createTable("t1", cols, Seq("int", "int", "int", "int"), 
cols.takeRight(2))
+      processInsert("t1", df.selectExpr("c2", "c1", "c4"),
+        partitionExprs = Seq("c3=3", "c4"), overwrite = false, byName = true)
+      verifyTable("t1", df.selectExpr(cols: _*))
+    }
+
+    withTable("t1") {
+      createTable("t1", cols, Seq("int", "int", "int", "int"), 
cols.takeRight(2))
+      processInsert("t1", df.selectExpr("c2", "c1"),
+        partitionExprs = Seq("c3=3", "c4=4"), overwrite = false, byName = true)
+      verifyTable("t1", df.selectExpr(cols: _*))
+    }
+  }
+
   test("insert with column list - table output reorder + partitioned table") {
     val cols = Seq("c1", "c2", "c3", "c4")
     val df = Seq((1, 2, 3, 4)).toDF(cols: _*)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 7fa3873fc6e..17a0f308a1a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1213,7 +1213,7 @@ class PlanResolutionSuite extends AnalysisTest {
       case InsertIntoStatement(
         _, _, _,
         UnresolvedInlineTable(_, 
Seq(Seq(UnresolvedAttribute(Seq("DEFAULT"))))),
-        _, _) =>
+        _, _, _) =>
 
       case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
     }
@@ -1221,7 +1221,7 @@ class PlanResolutionSuite extends AnalysisTest {
       case InsertIntoStatement(
         _, _, _,
         Project(Seq(UnresolvedAttribute(Seq("DEFAULT"))), _),
-        _, _) =>
+        _, _, _) =>
 
       case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString)
     }
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
index 4d8b7cdf354..aef9dc69656 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala
@@ -213,7 +213,7 @@ trait ThriftServerWithSparkContextSuite extends 
SharedThriftServer {
       val sessionHandle = client.openSession(user, "")
       val infoValue = client.getInfo(sessionHandle, 
GetInfoType.CLI_ODBC_KEYWORDS)
       // scalastyle:off line.size.limit
-      assert(infoValue.getStringValue == 
"ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DA
 [...]
+      assert(infoValue.getStringValue == 
"ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DA
 [...]
       // scalastyle:on line.size.limit
     }
   }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index b2438d38520..3da3d4a0eb5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -145,7 +145,7 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
 
     // handles InsertIntoStatement specially as the table in 
InsertIntoStatement is not added in its
     // children, hence not matched directly by previous HiveTableRelation case.
-    case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _)
+    case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _)
       if DDLUtils.isHiveTable(relation.tableMeta) && 
relation.tableMeta.stats.isEmpty =>
       i.copy(table = hiveTableWithStats(relation))
   }
@@ -160,7 +160,7 @@ class DetermineTableStats(session: SparkSession) extends 
Rule[LogicalPlan] {
 object HiveAnalysis extends Rule[LogicalPlan] {
   override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
     case InsertIntoStatement(
-        r: HiveTableRelation, partSpec, _, query, overwrite, 
ifPartitionNotExists)
+        r: HiveTableRelation, partSpec, _, query, overwrite, 
ifPartitionNotExists, _)
         if DDLUtils.isHiveTable(r.tableMeta) =>
       InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite,
         ifPartitionNotExists, query.output.map(_.name))
@@ -226,12 +226,12 @@ case class RelationConversions(
     plan resolveOperators {
       // Write path
       case InsertIntoStatement(
-          r: HiveTableRelation, partition, cols, query, overwrite, 
ifPartitionNotExists)
+          r: HiveTableRelation, partition, cols, query, overwrite, 
ifPartitionNotExists, byName)
           if query.resolved && DDLUtils.isHiveTable(r.tableMeta) &&
             (!r.isPartitioned || 
conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE))
             && isConvertible(r) =>
         InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), 
partition, cols,
-          query, overwrite, ifPartitionNotExists)
+          query, overwrite, ifPartitionNotExists, byName)
 
       // Read path
       case relation: HiveTableRelation


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to