This is an automated email from the ASF dual-hosted git repository. holden pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ae100267c28b [SPARK-44735][SQL] Add warning msg when inserting columns with the same name by row that don't match up ae100267c28b is described below commit ae100267c28bc6fd2c2f9c880ed3df1999423992 Author: Jia Fan <fanjiaemi...@qq.com> AuthorDate: Mon Oct 23 10:51:31 2023 -0700 [SPARK-44735][SQL] Add warning msg when inserting columns with the same name by row that don't match up ### What changes were proposed in this pull request? This PR add a warning msg when inserting columns name with the same name by row but order not matched. Tell user can use `INSERT INTO BY NAME` to reorder columns to match with table schema. It will be like: ![image](https://github.com/apache/spark/assets/32387433/18e57125-8a2e-407c-a3fd-93a9cbf122a1) ### Why are the changes needed? Optimize user usage scenarios. ### Does this PR introduce _any_ user-facing change? Yes, sometimes will show some warning. ### How was this patch tested? Test in local ### Was this patch authored or co-authored using generative AI tooling? No Closes #42763 from Hisoka-X/SPARK-44735_add_warning_for_by_name. Authored-by: Jia Fan <fanjiaemi...@qq.com> Signed-off-by: Holden Karau <hol...@pigscanfly.ca> --- .../apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 ++ .../sql/catalyst/analysis/TableOutputResolver.scala | 17 ++++++++++++++++- .../apache/spark/sql/execution/datasources/rules.scala | 5 ++++- 3 files changed, 22 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 0469fb29a6fc..06d949ece262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3421,6 +3421,8 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor case v2Write: V2WriteCommand if v2Write.table.resolved && v2Write.query.resolved && !v2Write.outputResolved => validateStoreAssignmentPolicy() + TableOutputResolver.suitableForByNameCheck(v2Write.isByName, + expected = v2Write.table.output, queryOutput = v2Write.query.output) val projection = TableOutputResolver.resolveOutputColumns( v2Write.table.name, v2Write.table.output, v2Write.query, v2Write.isByName, conf) if (projection != v2Write.query) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala index d41757725771..1398552399cd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala @@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis import scala.collection.mutable +import org.apache.spark.internal.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.SQLConfHelper import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -34,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, IntegralType, MapType, StructType} -object TableOutputResolver { +object TableOutputResolver extends SQLConfHelper with Logging { def resolveVariableOutputColumns( expected: Seq[VariableReference], @@ -470,6 +472,19 @@ object TableOutputResolver { } } + def suitableForByNameCheck( + byName: Boolean, + expected: Seq[Attribute], + queryOutput: Seq[Attribute]): Unit = { + if (!byName && expected.size == queryOutput.size && + expected.forall(e => queryOutput.exists(p => conf.resolver(p.name, e.name))) && + expected.zip(queryOutput).exists(e => !conf.resolver(e._1.name, e._2.name))) { + logWarning("The query columns and the table columns have same names but different " + + "orders. You can use INSERT [INTO | OVERWRITE] BY NAME to reorder the query columns to " + + "align with the table columns.") + } + } + private def containsIntegralOrDecimalType(dt: DataType): Boolean = dt match { case _: IntegralType | _: DecimalType => true case a: ArrayType => containsIntegralOrDecimalType(a.elementType) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala index c5e86ee2d03e..65ebbb57fd32 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala @@ -404,11 +404,14 @@ object PreprocessTableInsertion extends ResolveInsertionBase { insert.query } val newQuery = try { + val byName = hasColumnList || insert.byName + TableOutputResolver.suitableForByNameCheck(byName, expected = expectedColumns, + queryOutput = query.output) TableOutputResolver.resolveOutputColumns( tblName, expectedColumns, query, - byName = hasColumnList || insert.byName, + byName, conf, supportColDefaultValue = true) } catch { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org