This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 24bd29cc56a [SPARK-43438][SQL] Error on missing input columns in 
`INSERT`
24bd29cc56a is described below

commit 24bd29cc56a7e12a45d713b5ca0bf2205b80a8f6
Author: Max Gekk <max.g...@gmail.com>
AuthorDate: Tue Aug 29 23:04:44 2023 +0300

    [SPARK-43438][SQL] Error on missing input columns in `INSERT`
    
    ### What changes were proposed in this pull request?
    In the PR, I propose to raise an error when an user uses V1 `INSERT` 
without a list of columns, and the number of inserting columns doesn't match to 
the number of actual table columns.
    
    At the moment Spark inserts data successfully in such case after the PR 
https://github.com/apache/spark/pull/41262 which changed the behaviour of Spark 
3.4.x.
    
    ### Why are the changes needed?
    1. To conform the SQL standard which requires the number of columns must be 
the same:
    ![Screenshot 2023-08-07 at 11 01 27 
AM](https://github.com/apache/spark/assets/1580697/c55badec-5716-490f-a83a-0bb6b22c84c7)
    
    Apparently, the insertion below must not succeed:
    ```sql
    spark-sql (default)> CREATE TABLE tabtest(c1 INT, c2 INT);
    spark-sql (default)> INSERT INTO tabtest SELECT 1;
    ```
    
    2. To have the same behaviour as **Spark 3.4**:
    ```sql
    spark-sql (default)> INSERT INTO tabtest SELECT 1;
    `spark_catalog`.`default`.`tabtest` requires that the data to be inserted 
have the same number of columns as the target table: target table has 2 
column(s) but the inserted data has 1 column(s), including 0 partition 
column(s) having constant value(s).
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    
    After the changes:
    ```sql
    spark-sql (default)> INSERT INTO tabtest SELECT 1;
    [INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS] Cannot write to 
`spark_catalog`.`default`.`tabtest`, the reason is not enough data columns:
    Table columns: `c1`, `c2`.
    Data columns: `1`.
    ```
    
    ### How was this patch tested?
    By running the modified tests:
    ```
    $ build/sbt "test:testOnly *InsertSuite"
    $ build/sbt "test:testOnly *ResolveDefaultColumnsSuite"
    $ build/sbt -Phive "test:testOnly *HiveQuerySuite"
    ```
    
    Closes #42393 from MaxGekk/fix-num-cols-insert.
    
    Authored-by: Max Gekk <max.g...@gmail.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
    (cherry picked from commit a7eef2116919bd0c1a1b52adaf49de903e8c9c46)
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../catalyst/analysis/TableOutputResolver.scala    | 15 +-----
 .../spark/sql/execution/datasources/rules.scala    |  6 ++-
 .../spark/sql/ResolveDefaultColumnsSuite.scala     | 59 +++++++++++++++++-----
 .../org/apache/spark/sql/sources/InsertSuite.scala | 18 ++++---
 .../org/apache/spark/sql/hive/InsertSuite.scala    |  2 +-
 .../spark/sql/hive/execution/HiveQuerySuite.scala  |  6 +--
 6 files changed, 69 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
index 894cd0b3991..6671836b351 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TableOutputResolver.scala
@@ -65,22 +65,11 @@ object TableOutputResolver {
         errors += _,
         fillDefaultValue = supportColDefaultValue)
     } else {
-      // If the target table needs more columns than the input query, fill 
them with
-      // the columns' default values, if the `supportColDefaultValue` 
parameter is true.
-      val fillDefaultValue = supportColDefaultValue && actualExpectedCols.size 
> query.output.size
-      val queryOutputCols = if (fillDefaultValue) {
-        query.output ++ actualExpectedCols.drop(query.output.size).flatMap { 
expectedCol =>
-          getDefaultValueExprOrNullLit(expectedCol, 
conf.useNullsForMissingDefaultColumnValues)
-        }
-      } else {
-        query.output
-      }
-      if (actualExpectedCols.size > queryOutputCols.size) {
+      if (actualExpectedCols.size > query.output.size) {
         throw QueryCompilationErrors.cannotWriteNotEnoughColumnsToTableError(
           tableName, actualExpectedCols.map(_.name), query)
       }
-
-      resolveColumnsByPosition(tableName, queryOutputCols, actualExpectedCols, 
conf, errors += _)
+      resolveColumnsByPosition(tableName, query.output, actualExpectedCols, 
conf, errors += _)
     }
 
     if (errors.nonEmpty) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 4cbd54e6d20..f9b3f73ff02 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -404,7 +404,11 @@ object PreprocessTableInsertion extends 
ResolveInsertionBase {
     }
     val newQuery = try {
       TableOutputResolver.resolveOutputColumns(
-        tblName, expectedColumns, query, byName = hasColumnList || 
insert.byName, conf,
+        tblName,
+        expectedColumns,
+        query,
+        byName = hasColumnList || insert.byName,
+        conf,
         supportColDefaultValue = true)
     } catch {
       case e: AnalysisException if staticPartCols.nonEmpty &&
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala
index b2cc4e3b746..29b2796d25a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/ResolveDefaultColumnsSuite.scala
@@ -35,9 +35,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
 
       // INSERT without user-defined columns
       sql("truncate table t")
-      sql("insert into t values (timestamp'2020-12-31')")
-      checkAnswer(spark.table("t"),
-        sql("select timestamp'2020-12-31', null").collect().head)
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t values (timestamp'2020-12-31')")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`t`",
+          "tableColumns" -> "`c1`, `c2`",
+          "dataColumns" -> "`col1`"))
     }
   }
 
@@ -57,9 +63,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
 
       // INSERT without user-defined columns
       sql("truncate table t")
-      sql("insert into t values (timestamp'2020-12-31')")
-      checkAnswer(spark.table("t"),
-        sql("select timestamp'2020-12-31', 
timestamp'2020-01-01'").collect().head)
+      checkError(
+        exception = intercept[AnalysisException] {
+          sql("insert into t values (timestamp'2020-12-31')")
+        },
+        errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+        parameters = Map(
+          "tableName" -> "`spark_catalog`.`default`.`t`",
+          "tableColumns" -> "`c1`, `c2`",
+          "dataColumns" -> "`col1`"))
     }
   }
 
@@ -67,8 +79,15 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
     sql("create table t(c1 int, c2 int, c3 int, c4 int) using parquet 
partitioned by (c3, c4)")
 
     // INSERT without static partitions
-    sql("insert into t values (1, 2, 3)")
-    checkAnswer(spark.table("t"), Row(1, 2, 3, null))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("insert into t values (1, 2, 3)")
+      },
+      errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+      parameters = Map(
+        "tableName" -> "`spark_catalog`.`default`.`t`",
+        "tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
+        "dataColumns" -> "`col1`, `col2`, `col3`"))
 
     // INSERT without static partitions but with column list
     sql("truncate table t")
@@ -77,8 +96,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
 
     // INSERT with static partitions
     sql("truncate table t")
-    sql("insert into t partition(c3=3, c4=4) values (1)")
-    checkAnswer(spark.table("t"), Row(1, null, 3, 4))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("insert into t partition(c3=3, c4=4) values (1)")
+      },
+      errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH",
+      parameters = Map(
+        "tableName" -> "`spark_catalog`.`default`.`t`",
+        "tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
+        "dataColumns" -> "`col1`",
+        "staticPartCols" -> "`c3`, `c4`"))
 
     // INSERT with static partitions and with column list
     sql("truncate table t")
@@ -87,8 +114,16 @@ class ResolveDefaultColumnsSuite extends QueryTest with 
SharedSparkSession {
 
     // INSERT with partial static partitions
     sql("truncate table t")
-    sql("insert into t partition(c3=3, c4) values (1, 2)")
-    checkAnswer(spark.table("t"), Row(1, 2, 3, null))
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("insert into t partition(c3=3, c4) values (1, 2)")
+      },
+      errorClass = "INSERT_PARTITION_COLUMN_ARITY_MISMATCH",
+      parameters = Map(
+        "tableName" -> "`spark_catalog`.`default`.`t`",
+        "tableColumns" -> "`c1`, `c2`, `c3`, `c4`",
+        "dataColumns" -> "`col1`, `col2`",
+        "staticPartCols" -> "`c3`"))
 
     // INSERT with partial static partitions and with column list is not 
allowed
     intercept[AnalysisException](sql("insert into t partition(c3=3, c4) (c1) 
values (1, 4)"))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index c6bfd8c14dd..7b1a5a32037 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -962,11 +962,15 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
       (1 to 10).map(i => Row(i, null))
     )
 
-    sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
-    checkAnswer(
-      sql("SELECT a, b FROM jsonTable"),
-      (1 to 10).map(i => Row(i, null))
-    )
+    checkError(
+      exception = intercept[AnalysisException] {
+        sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt")
+      },
+      errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
+      parameters = Map(
+        "tableName" -> "`unknown`",
+        "tableColumns" -> "`a`, `b`",
+        "dataColumns" -> "`a`"))
 
     sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt")
     checkAnswer(
@@ -1027,7 +1031,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
     withTable("t") {
       sql("create table t(i int, s bigint default 42, x bigint) using parquet")
-      sql("insert into t values(1)")
+      sql("insert into t(i) values(1)")
       checkAnswer(spark.table("t"), Row(1, 42L, null))
     }
     // The table has a partitioning column and a default value is injected.
@@ -1495,7 +1499,7 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
       sql(createTableIntCol)
       sql("alter table t add column s bigint default 42")
       sql("alter table t add column x bigint")
-      sql("insert into t values(1)")
+      sql("insert into t(i) values(1)")
       checkAnswer(spark.table("t"), Row(1, 42, null))
     }
     // The table has a partitioning column and a default value is injected.
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index 420b4fc83ec..ea43f1d2c67 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -391,7 +391,7 @@ class InsertSuite extends QueryTest with TestHiveSingleton 
with BeforeAndAfter
         sql(s"INSERT INTO TABLE $tableName PARTITION (c=11, b=10) SELECT 9, 
12")
 
         // The data is missing a column. The default value for the missing 
column is null.
-        sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) SELECT 13")
+        sql(s"INSERT INTO TABLE $tableName PARTITION (c=15, b=16) (a) SELECT 
13")
 
         // c is defined twice. Analyzer will complain.
         intercept[ParseException] {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
index 4eae3933bf5..82b88ec9f35 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala
@@ -1258,11 +1258,11 @@ class HiveQuerySuite extends HiveComparisonTest with 
SQLTestUtils with BeforeAnd
           """INSERT INTO TABLE dp_test PARTITION(dp)
             |SELECT key, value, key % 5 FROM src""".stripMargin)
       },
-      errorClass = "_LEGACY_ERROR_TEMP_1169",
+      errorClass = "INSERT_COLUMN_ARITY_MISMATCH.NOT_ENOUGH_DATA_COLUMNS",
       parameters = Map(
         "tableName" -> "`spark_catalog`.`default`.`dp_test`",
-        "normalizedPartSpec" -> "dp",
-        "partColNames" -> "dp,sp"))
+        "tableColumns" -> "`key`, `value`, `dp`, `sp`",
+        "dataColumns" -> "`key`, `value`, `(key % 5)`"))
 
     sql("SET hive.exec.dynamic.partition.mode=nonstrict")
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to