This is an automated email from the ASF dual-hosted git repository.

holden pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ae100267c28b [SPARK-44735][SQL] Add warning msg when inserting columns 
with the same name by row that don't match up
ae100267c28b is described below

commit ae100267c28bc6fd2c2f9c880ed3df1999423992
Author: Jia Fan <fanjiaemi...@qq.com>
AuthorDate: Mon Oct 23 10:51:31 2023 -0700

    [SPARK-44735][SQL] Add warning msg when inserting columns with the same 
name by row that don't match up
    
    ### What changes were proposed in this pull request?
    This PR add a warning msg when inserting columns name with the same name by 
row but order not matched. Tell user can use `INSERT INTO BY NAME` to reorder 
columns to match with table schema.
    It will be like:
    
![image](https://github.com/apache/spark/assets/32387433/18e57125-8a2e-407c-a3fd-93a9cbf122a1)
    
    ### Why are the changes needed?
    Optimize user usage scenarios.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, sometimes will show some warning.
    
    ### How was this patch tested?
    Test in local
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #42763 from Hisoka-X/SPARK-44735_add_warning_for_by_name.
    
    Authored-by: Jia Fan <fanjiaemi...@qq.com>
    Signed-off-by: Holden Karau <hol...@pigscanfly.ca>
---
 .../apache/spark/sql/catalyst/analysis/Analyzer.scala   |  2 ++
 .../sql/catalyst/analysis/TableOutputResolver.scala     | 17 ++++++++++++++++-
 .../apache/spark/sql/execution/datasources/rules.scala  |  5 ++++-
 3 files changed, 22 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 0469fb29a6fc..06d949ece262 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3421,6 +3421,8 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
       case v2Write: V2WriteCommand
           if v2Write.table.resolved && v2Write.query.resolved && 
!v2Write.outputResolved =>
         validateStoreAssignmentPolicy()
+        TableOutputResolver.suitableForByNameCheck(v2Write.isByName,
+          expected = v2Write.table.output, queryOutput = v2Write.query.output)
         val projection = TableOutputResolver.resolveOutputColumns(
           v2Write.table.name, v2Write.table.output, v2Write.query, 
v2Write.isByName, conf)
         if (projection != v2Write.query) {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index d41757725771..1398552399cd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql.catalyst.analysis
 
 import scala.collection.mutable
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.objects.AssertNotNull
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
@@ -34,7 +36,7 @@ import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy
 import org.apache.spark.sql.types.{ArrayType, DataType, DecimalType, 
IntegralType, MapType, StructType}
 
-object TableOutputResolver {
+object TableOutputResolver extends SQLConfHelper with Logging {
 
   def resolveVariableOutputColumns(
       expected: Seq[VariableReference],
@@ -470,6 +472,19 @@ object TableOutputResolver {
     }
   }
 
+  def suitableForByNameCheck(
+      byName: Boolean,
+      expected: Seq[Attribute],
+      queryOutput: Seq[Attribute]): Unit = {
+    if (!byName && expected.size == queryOutput.size &&
+      expected.forall(e => queryOutput.exists(p => conf.resolver(p.name, 
e.name))) &&
+      expected.zip(queryOutput).exists(e => !conf.resolver(e._1.name, 
e._2.name))) {
+      logWarning("The query columns and the table columns have same names but 
different " +
+        "orders. You can use INSERT [INTO | OVERWRITE] BY NAME to reorder the 
query columns to " +
+        "align with the table columns.")
+    }
+  }
+
   private def containsIntegralOrDecimalType(dt: DataType): Boolean = dt match {
     case _: IntegralType | _: DecimalType => true
     case a: ArrayType => containsIntegralOrDecimalType(a.elementType)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index c5e86ee2d03e..65ebbb57fd32 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -404,11 +404,14 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
       insert.query
     }
     val newQuery = try {
+      val byName = hasColumnList || insert.byName
+      TableOutputResolver.suitableForByNameCheck(byName, expected = 
expectedColumns,
+        queryOutput = query.output)
       TableOutputResolver.resolveOutputColumns(
         tblName,
         expectedColumns,
         query,
-        byName = hasColumnList || insert.byName,
+        byName,
         conf,
         supportColDefaultValue = true)
     } catch {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to