This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e6adc67d43d [SPARK-42750][SQL] Support Insert By Name statement e6adc67d43d is described below commit e6adc67d43d6beccf21013ee00aa274bed13107c Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Wed Jun 7 10:30:59 2023 +0800 [SPARK-42750][SQL] Support Insert By Name statement ### What changes were proposed in this pull request? In some use cases, users have incoming dataframes with fixed column names which might differ from the canonical order. Currently there's no way to handle this easily through the INSERT INTO API - the user has to make sure the columns are in the right order as they would when inserting a tuple. We should add an optional BY NAME clause, such that: `INSERT INTO tgt BY NAME <query>` takes each column of <query> and inserts it into the column in `tgt` which has the same name according to the configured `resolver` logic. Some definitions need to be clarified: 1. `BY NAME` and specified column insertion (`INSERT INTO t1 (a,b)`... ) is a mutually exclusive operation 2. But it supports to define partition while using `BY NAME`: `INSERT INTO t PARTITION(a=1) BY NAME <query>` At now don't support `INSERT OVERWRITE BY NAME` (I will supported in follow up) ### Why are the changes needed? Add new feature `INSERT INTO BY NAME` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new test. Closes #40908 from Hisoka-X/SPARK-42750_insert_into_by_name. Lead-authored-by: Jia Fan <fanjiaemi...@qq.com> Co-authored-by: Hisoka <fanjiaemi...@qq.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- docs/sql-ref-ansi-compliance.md | 3 +- .../spark/sql/catalyst/parser/SqlBaseLexer.g4 | 1 + .../spark/sql/catalyst/parser/SqlBaseParser.g4 | 4 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 7 ++-- .../sql/catalyst/analysis/CheckAnalysis.scala | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 19 ++++++---- .../sql/catalyst/plans/logical/statements.scala | 7 +++- .../spark/sql/catalyst/parser/DDLParserSuite.scala | 34 +++++++++++++++++ .../execution/datasources/DataSourceStrategy.scala | 9 +++-- .../datasources/FallBackFileSourceV2.scala | 2 +- .../spark/sql/execution/datasources/rules.scala | 14 ++++--- .../sql-tests/analyzer-results/explain-aqe.sql.out | 2 +- .../sql-tests/analyzer-results/explain.sql.out | 2 +- .../sql-tests/results/ansi/keywords.sql.out | 1 + .../sql-tests/results/explain-aqe.sql.out | 2 +- .../resources/sql-tests/results/explain.sql.out | 2 +- .../resources/sql-tests/results/keywords.sql.out | 1 + .../org/apache/spark/sql/SQLInsertTestSuite.scala | 43 ++++++++++++++++++++-- .../execution/command/PlanResolutionSuite.scala | 4 +- .../ThriftServerWithSparkContextSuite.scala | 2 +- .../org/apache/spark/sql/hive/HiveStrategies.scala | 8 ++-- 21 files changed, 129 insertions(+), 40 deletions(-) diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index 76b5d5aef73..f9c6f5ea6aa 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -350,7 +350,7 @@ By default, both `spark.sql.ansi.enabled` and `spark.sql.ansi.enforceReservedKey Below is a list of all the keywords in Spark SQL. |Keyword|Spark SQL<br/>ANSI Mode|Spark SQL<br/>Default Mode|SQL-2016| -|-------|----------------------|-------------------------|--------| +|------|----------------------|-------------------------|--------| |ADD|non-reserved|non-reserved|non-reserved| |AFTER|non-reserved|non-reserved|non-reserved| |ALL|reserved|non-reserved|reserved| @@ -527,6 +527,7 @@ Below is a list of all the keywords in Spark SQL. |MONTH|non-reserved|non-reserved|non-reserved| |MONTHS|non-reserved|non-reserved|non-reserved| |MSCK|non-reserved|non-reserved|non-reserved| +|NAME|non-reserved|non-reserved|non-reserved| |NAMESPACE|non-reserved|non-reserved|non-reserved| |NAMESPACES|non-reserved|non-reserved|non-reserved| |NANOSECOND|non-reserved|non-reserved|non-reserved| diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 index 6300221b542..ecd5f5912fd 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseLexer.g4 @@ -264,6 +264,7 @@ MINUTES: 'MINUTES'; MONTH: 'MONTH'; MONTHS: 'MONTHS'; MSCK: 'MSCK'; +NAME: 'NAME'; NAMESPACE: 'NAMESPACE'; NAMESPACES: 'NAMESPACES'; NANOSECOND: 'NANOSECOND'; diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 20c8df4f79a..89100f2aeec 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -318,7 +318,7 @@ query insertInto : INSERT OVERWRITE TABLE? identifierReference (partitionSpec (IF NOT EXISTS)?)? identifierList? #insertOverwriteTable - | INSERT INTO TABLE? identifierReference partitionSpec? (IF NOT EXISTS)? identifierList? #insertIntoTable + | INSERT INTO TABLE? identifierReference partitionSpec? (IF NOT EXISTS)? ((BY NAME) | identifierList)? #insertIntoTable | INSERT INTO TABLE? identifierReference REPLACE whereClause #insertIntoReplaceWhere | INSERT OVERWRITE LOCAL? DIRECTORY path=stringLit rowFormat? createFileFormat? #insertOverwriteHiveDir | INSERT OVERWRITE LOCAL? DIRECTORY (path=stringLit)? tableProvider (OPTIONS options=propertyList)? #insertOverwriteDir @@ -1362,6 +1362,7 @@ ansiNonReserved | MONTH | MONTHS | MSCK + | NAME | NAMESPACE | NAMESPACES | NANOSECOND @@ -1683,6 +1684,7 @@ nonReserved | MONTH | MONTHS | MSCK + | NAME | NAMESPACE | NAMESPACES | NANOSECOND diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 51b35498526..aa1b9d0e8fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1078,7 +1078,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor def apply(plan: LogicalPlan) : LogicalPlan = plan.resolveOperatorsUpWithPruning(AlwaysProcess.fn, ruleId) { - case i @ InsertIntoStatement(table, _, _, _, _, _) => + case i @ InsertIntoStatement(table, _, _, _, _, _, _) => val relation = table match { case u: UnresolvedRelation if !u.isStreaming => resolveRelation(u).getOrElse(u) @@ -1278,7 +1278,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor object ResolveInsertInto extends ResolveInsertionBase { override def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsWithPruning( AlwaysProcess.fn, ruleId) { - case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _) if i.query.resolved => + case i @ InsertIntoStatement(r: DataSourceV2Relation, _, _, _, _, _, _) + if i.query.resolved => // ifPartitionNotExists is append with validation, but validation is not supported if (i.ifPartitionNotExists) { throw QueryCompilationErrors.unsupportedIfNotExistsError(r.table.name) @@ -1290,7 +1291,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor } else { None } - val isByName = projectByName.nonEmpty + val isByName = projectByName.nonEmpty || i.byName val partCols = partitionColumnNames(r.table) validatePartitionSpec(partCols, i.partitionSpec) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 9124890d4af..e84023ec3df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -166,7 +166,7 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB // not found first, instead of errors in the input query of the insert command, by doing a // top-down traversal. plan.foreach { - case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _) => + case InsertIntoStatement(u: UnresolvedRelation, _, _, _, _, _, _) => u.tableNotFound(u.multipartIdentifier) // TODO (SPARK-27484): handle streaming write commands when we have them. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 0801cfbda4b..f4170860c24 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -277,10 +277,10 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit /** * Parameters used for writing query to a table: - * (table ident, tableColumnList, partitionKeys, ifPartitionNotExists). + * (table ident, tableColumnList, partitionKeys, ifPartitionNotExists, byName). */ type InsertTableParams = - (IdentifierReferenceContext, Seq[String], Map[String, Option[String]], Boolean) + (IdentifierReferenceContext, Seq[String], Map[String, Option[String]], Boolean, Boolean) /** * Parameters used for writing query to a directory: (isLocal, CatalogStorageFormat, provider). @@ -291,7 +291,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit * Add an * {{{ * INSERT OVERWRITE TABLE tableIdentifier [partitionSpec [IF NOT EXISTS]]? [identifierList] - * INSERT INTO [TABLE] tableIdentifier [partitionSpec] [identifierList] + * INSERT INTO [TABLE] tableIdentifier [partitionSpec] ([BY NAME] | [identifierList]) * INSERT INTO [TABLE] tableIdentifier REPLACE whereClause * INSERT OVERWRITE [LOCAL] DIRECTORY STRING [rowFormat] [createFileFormat] * INSERT OVERWRITE [LOCAL] DIRECTORY [STRING] tableProvider [OPTIONS tablePropertyList] @@ -307,7 +307,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit // 2. Write commands do not hold the table logical plan as a child, and we need to add // additional resolution code to resolve identifiers inside the write commands. case table: InsertIntoTableContext => - val (relationCtx, cols, partition, ifPartitionNotExists) = visitInsertIntoTable(table) + val (relationCtx, cols, partition, ifPartitionNotExists, byName) + = visitInsertIntoTable(table) withIdentClause(relationCtx, ident => { InsertIntoStatement( createUnresolvedRelation(relationCtx, ident), @@ -315,10 +316,12 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit cols, query, overwrite = false, - ifPartitionNotExists) + ifPartitionNotExists, + byName) }) case table: InsertOverwriteTableContext => - val (relationCtx, cols, partition, ifPartitionNotExists) = visitInsertOverwriteTable(table) + val (relationCtx, cols, partition, ifPartitionNotExists, _) + = visitInsertOverwriteTable(table) withIdentClause(relationCtx, ident => { InsertIntoStatement( createUnresolvedRelation(relationCtx, ident), @@ -358,7 +361,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit operationNotAllowed("INSERT INTO ... IF NOT EXISTS", ctx) } - (ctx.identifierReference, cols, partitionKeys, false) + (ctx.identifierReference, cols, partitionKeys, false, ctx.NAME() != null) } /** @@ -376,7 +379,7 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit dynamicPartitionKeys.keys.mkString(", "), ctx) } - (ctx.identifierReference, cols, partitionKeys, ctx.EXISTS() != null) + (ctx.identifierReference, cols, partitionKeys, ctx.EXISTS() != null, false) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala index 9c639a4bce6..669750ee448 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala @@ -165,6 +165,8 @@ case class QualifiedColType( * would have Map('a' -> Some('1'), 'b' -> None). * @param ifPartitionNotExists If true, only write if the partition does not exist. * Only valid for static partitions. + * @param byName If true, reorder the data columns to match the column names of the + * target table. */ case class InsertIntoStatement( table: LogicalPlan, @@ -172,12 +174,15 @@ case class InsertIntoStatement( userSpecifiedCols: Seq[String], query: LogicalPlan, overwrite: Boolean, - ifPartitionNotExists: Boolean) extends UnaryParsedStatement { + ifPartitionNotExists: Boolean, + byName: Boolean = false) extends UnaryParsedStatement { require(overwrite || !ifPartitionNotExists, "IF NOT EXISTS is only valid in INSERT OVERWRITE") require(partitionSpec.values.forall(_.nonEmpty) || !ifPartitionNotExists, "IF NOT EXISTS is only valid with static partitions") + require(userSpecifiedCols.isEmpty || !byName, + "BY NAME is only valid without specified cols") override def child: LogicalPlan = query override protected def withNewChildInternal(newChild: LogicalPlan): InsertIntoStatement = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 5899f813f14..53635acf0b3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1673,6 +1673,40 @@ class DDLParserSuite extends AnalysisTest { stop = 69)) } + test("insert table: by name") { + Seq( + "INSERT INTO TABLE testcat.ns1.ns2.tbl BY NAME SELECT * FROM source", + "INSERT INTO testcat.ns1.ns2.tbl BY NAME SELECT * FROM source" + ).foreach { sql => + parseCompare(sql, + InsertIntoStatement( + UnresolvedRelation(Seq("testcat", "ns1", "ns2", "tbl")), + Map.empty, + Nil, + Project(Seq(UnresolvedStar(None)), UnresolvedRelation(Seq("source"))), + overwrite = false, ifPartitionNotExists = false, byName = true)) + } + } + + test("insert table: by name unsupported case") { + checkError( + exception = parseException("INSERT OVERWRITE TABLE t1 BY NAME SELECT * FROM tmp_view"), + errorClass = "PARSE_SYNTAX_ERROR", + parameters = Map( + "error" -> "'BY'", + "hint" -> "") + ) + + checkError( + exception = parseException( + "INSERT INTO TABLE t1 BY NAME (c1,c2) SELECT * FROM tmp_view"), + errorClass = "PARSE_SYNTAX_ERROR", + parameters = Map( + "error" -> "'c1'", + "hint" -> "") + ) + } + test("delete from table: delete all") { parseCompare("DELETE FROM testcat.ns1.ns2.tbl", DeleteFromTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 818dc4eb31c..454cc0b5f56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -152,7 +152,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { CreateDataSourceTableAsSelectCommand(tableDesc, mode, query, query.output.map(_.name)) case InsertIntoStatement(l @ LogicalRelation(_: InsertableRelation, _, _, _), - parts, _, query, overwrite, false) if parts.isEmpty => + parts, _, query, overwrite, false, _) if parts.isEmpty => InsertIntoDataSourceCommand(l, query, overwrite) case InsertIntoDir(_, storage, provider, query, overwrite) @@ -164,7 +164,7 @@ object DataSourceAnalysis extends Rule[LogicalPlan] { InsertIntoDataSourceDirCommand(storage, provider.get, query, overwrite) case i @ InsertIntoStatement( - l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, query, overwrite, _) + l @ LogicalRelation(t: HadoopFsRelation, _, table, _), parts, _, query, overwrite, _, _) if query.resolved => // If the InsertIntoTable command is for a partitioned HadoopFsRelation and // the user has specified static partitions, we add a Project operator on top of the query @@ -275,10 +275,11 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan] override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, options, false), - _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => + _, _, _, _, _, _) if DDLUtils.isDatasourceTable(tableMeta) => i.copy(table = readDataSourceTable(tableMeta, options)) - case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), _, _, _, _, _) => + case i @ InsertIntoStatement(UnresolvedCatalogRelation(tableMeta, _, false), + _, _, _, _, _, _) => i.copy(table = DDLUtils.readHiveTable(tableMeta)) case UnresolvedCatalogRelation(tableMeta, options, false) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala index b5d06db0241..2e1ae9fe3ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FallBackFileSourceV2.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, File class FallBackFileSourceV2(sparkSession: SparkSession) extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case i @ InsertIntoStatement( - d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _) => + d @ DataSourceV2Relation(table: FileTable, _, _, _, _), _, _, _, _, _, _) => val v1FileFormat = table.fallbackFileFormat.newInstance() val relation = HadoopFsRelation( table.fileIndex, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index b3fdfc76c7d..0b07ae1d11c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -389,15 +389,16 @@ object PreprocessTableInsertion extends ResolveInsertionBase { } // Create a project if this INSERT has a user-specified column list. - val isByName = insert.userSpecifiedCols.nonEmpty - val query = if (isByName) { + val hasColumnList = insert.userSpecifiedCols.nonEmpty + val query = if (hasColumnList) { createProjectForByNameQuery(insert) } else { insert.query } val newQuery = try { TableOutputResolver.resolveOutputColumns( - tblName, expectedColumns, query, byName = isByName, conf, supportColDefaultValue = true) + tblName, expectedColumns, query, byName = hasColumnList || insert.byName, conf, + supportColDefaultValue = true) } catch { case e: AnalysisException if staticPartCols.nonEmpty && e.getErrorClass == "INSERT_COLUMN_ARITY_MISMATCH" => @@ -425,7 +426,7 @@ object PreprocessTableInsertion extends ResolveInsertionBase { } def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { - case i @ InsertIntoStatement(table, _, _, query, _, _) if table.resolved && query.resolved => + case i @ InsertIntoStatement(table, _, _, query, _, _, _) if table.resolved && query.resolved => table match { case relation: HiveTableRelation => val metadata = relation.tableMeta @@ -506,7 +507,8 @@ object PreWriteCheck extends (LogicalPlan => Unit) { def apply(plan: LogicalPlan): Unit = { plan.foreach { - case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition, _, query, _, _) => + case InsertIntoStatement(l @ LogicalRelation(relation, _, _, _), partition, + _, query, _, _, _) => // Get all input data source relations of the query. val srcRelations = query.collect { case LogicalRelation(src, _, _, _) => src @@ -528,7 +530,7 @@ object PreWriteCheck extends (LogicalPlan => Unit) { case _ => failAnalysis(s"$relation does not allow insertion.") } - case InsertIntoStatement(t, _, _, _, _, _) + case InsertIntoStatement(t, _, _, _, _, _, _) if !t.isInstanceOf[LeafNode] || t.isInstanceOf[Range] || t.isInstanceOf[OneRowRelation] || diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out index 8189f1fc7d1..c53642b8ba2 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain-aqe.sql.out @@ -196,7 +196,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val), None)], For -- !query EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 -- !query analysis -ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, ExtendedMode +ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, false, ExtendedMode -- !query diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out index 8189f1fc7d1..c53642b8ba2 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/explain.sql.out @@ -196,7 +196,7 @@ ExplainCommand 'Aggregate ['key], ['key, unresolvedalias('MIN('val), None)], For -- !query EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 -- !query analysis -ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, ExtendedMode +ExplainCommand 'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, false, ExtendedMode -- !query diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out index 9952c9aef62..34cc6cc4bd9 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/keywords.sql.out @@ -180,6 +180,7 @@ MINUTES false MONTH false MONTHS false MSCK false +NAME false NAMESPACE false NAMESPACES false NANOSECOND false diff --git a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out index d73035aa527..3c2677c936f 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain-aqe.sql.out @@ -1081,7 +1081,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 struct<plan:string> -- !query output == Parsed Logical Plan == -'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false +'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, false +- 'Project [*] +- 'UnresolvedRelation [explain_temp4], [], false diff --git a/sql/core/src/test/resources/sql-tests/results/explain.sql.out b/sql/core/src/test/resources/sql-tests/results/explain.sql.out index 5ac793fed86..f54c6c5e44f 100644 --- a/sql/core/src/test/resources/sql-tests/results/explain.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/explain.sql.out @@ -1023,7 +1023,7 @@ EXPLAIN EXTENDED INSERT INTO TABLE explain_temp5 SELECT * FROM explain_temp4 struct<plan:string> -- !query output == Parsed Logical Plan == -'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false +'InsertIntoStatement 'UnresolvedRelation [explain_temp5], [], false, false, false, false +- 'Project [*] +- 'UnresolvedRelation [explain_temp4], [], false diff --git a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out index aa13b029300..41d491c8027 100644 --- a/sql/core/src/test/resources/sql-tests/results/keywords.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/keywords.sql.out @@ -180,6 +180,7 @@ MINUTES false MONTH false MONTHS false MSCK false +NAME false NAMESPACE false NAMESPACES false NANOSECOND false diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index af85e44519b..1d27904bb2c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -51,17 +51,20 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { input: DataFrame, cols: Seq[String] = Nil, partitionExprs: Seq[String] = Nil, - overwrite: Boolean): Unit = { + overwrite: Boolean, + byName: Boolean = false): Unit = { val tmpView = "tmp_view" - val columnList = if (cols.nonEmpty) cols.mkString("(", ",", ")") else "" val partitionList = if (partitionExprs.nonEmpty) { partitionExprs.mkString("PARTITION (", ",", ")") } else "" withTempView(tmpView) { input.createOrReplaceTempView(tmpView) val overwriteStr = if (overwrite) "OVERWRITE" else "INTO" + val columnList = if (cols.nonEmpty && !byName) cols.mkString("(", ",", ")") else "" + val byNameStr = if (byName) "BY NAME" else "" sql( - s"INSERT $overwriteStr TABLE $tableName $partitionList $columnList SELECT * FROM $tmpView") + s"INSERT $overwriteStr TABLE $tableName $partitionList $byNameStr " + + s"$columnList SELECT * FROM $tmpView") } } @@ -123,6 +126,40 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { } } + test("insert with column list - by name") { + withTable("t1") { + val cols = Seq("c1", "c2", "c3") + val df = Seq((3, 2, 1)).toDF(cols.reverse: _*) + createTable("t1", cols, Seq("int", "int", "int")) + processInsert("t1", df, overwrite = false, byName = true) + verifyTable("t1", df.selectExpr(cols: _*)) + } + } + + test("insert with column list - by name + partitioned table") { + val cols = Seq("c1", "c2", "c3", "c4") + val df = Seq((4, 3, 2, 1)).toDF(cols.reverse: _*) + withTable("t1") { + createTable("t1", cols, Seq("int", "int", "int", "int"), cols.takeRight(2)) + processInsert("t1", df, overwrite = false, byName = true) + verifyTable("t1", df.selectExpr(cols: _*)) + } + + withTable("t1") { + createTable("t1", cols, Seq("int", "int", "int", "int"), cols.takeRight(2)) + processInsert("t1", df.selectExpr("c2", "c1", "c4"), + partitionExprs = Seq("c3=3", "c4"), overwrite = false, byName = true) + verifyTable("t1", df.selectExpr(cols: _*)) + } + + withTable("t1") { + createTable("t1", cols, Seq("int", "int", "int", "int"), cols.takeRight(2)) + processInsert("t1", df.selectExpr("c2", "c1"), + partitionExprs = Seq("c3=3", "c4=4"), overwrite = false, byName = true) + verifyTable("t1", df.selectExpr(cols: _*)) + } + } + test("insert with column list - table output reorder + partitioned table") { val cols = Seq("c1", "c2", "c3", "c4") val df = Seq((1, 2, 3, 4)).toDF(cols: _*) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 7fa3873fc6e..17a0f308a1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -1213,7 +1213,7 @@ class PlanResolutionSuite extends AnalysisTest { case InsertIntoStatement( _, _, _, UnresolvedInlineTable(_, Seq(Seq(UnresolvedAttribute(Seq("DEFAULT"))))), - _, _) => + _, _, _) => case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString) } @@ -1221,7 +1221,7 @@ class PlanResolutionSuite extends AnalysisTest { case InsertIntoStatement( _, _, _, Project(Seq(UnresolvedAttribute(Seq("DEFAULT"))), _), - _, _) => + _, _, _) => case _ => fail("Expect UpdateTable, but got:\n" + parsed1.treeString) } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala index 4d8b7cdf354..aef9dc69656 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ThriftServerWithSparkContextSuite.scala @@ -213,7 +213,7 @@ trait ThriftServerWithSparkContextSuite extends SharedThriftServer { val sessionHandle = client.openSession(user, "") val infoValue = client.getInfo(sessionHandle, GetInfoType.CLI_ODBC_KEYWORDS) // scalastyle:off line.size.limit - assert(infoValue.getStringValue == "ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DA [...] + assert(infoValue.getStringValue == "ADD,AFTER,ALL,ALTER,ALWAYS,ANALYZE,AND,ANTI,ANY,ANY_VALUE,ARCHIVE,ARRAY,AS,ASC,AT,AUTHORIZATION,BETWEEN,BIGINT,BINARY,BOOLEAN,BOTH,BUCKET,BUCKETS,BY,BYTE,CACHE,CASCADE,CASE,CAST,CATALOG,CATALOGS,CHANGE,CHAR,CHARACTER,CHECK,CLEAR,CLUSTER,CLUSTERED,CODEGEN,COLLATE,COLLECTION,COLUMN,COLUMNS,COMMENT,COMMIT,COMPACT,COMPACTIONS,COMPUTE,CONCATENATE,CONSTRAINT,COST,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,DATA,DA [...] // scalastyle:on line.size.limit } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index b2438d38520..3da3d4a0eb5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -145,7 +145,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { // handles InsertIntoStatement specially as the table in InsertIntoStatement is not added in its // children, hence not matched directly by previous HiveTableRelation case. - case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _) + case i @ InsertIntoStatement(relation: HiveTableRelation, _, _, _, _, _, _) if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty => i.copy(table = hiveTableWithStats(relation)) } @@ -160,7 +160,7 @@ class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] { object HiveAnalysis extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case InsertIntoStatement( - r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists) + r: HiveTableRelation, partSpec, _, query, overwrite, ifPartitionNotExists, _) if DDLUtils.isHiveTable(r.tableMeta) => InsertIntoHiveTable(r.tableMeta, partSpec, query, overwrite, ifPartitionNotExists, query.output.map(_.name)) @@ -226,12 +226,12 @@ case class RelationConversions( plan resolveOperators { // Write path case InsertIntoStatement( - r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists) + r: HiveTableRelation, partition, cols, query, overwrite, ifPartitionNotExists, byName) if query.resolved && DDLUtils.isHiveTable(r.tableMeta) && (!r.isPartitioned || conf.getConf(HiveUtils.CONVERT_INSERTING_PARTITIONED_TABLE)) && isConvertible(r) => InsertIntoStatement(metastoreCatalog.convert(r, isWrite = true), partition, cols, - query, overwrite, ifPartitionNotExists) + query, overwrite, ifPartitionNotExists, byName) // Read path case relation: HiveTableRelation --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org