This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c5d886474bd [SPARK-38838][SQL] Support ALTER TABLE ALTER COLUMN 
commands with DEFAULT values
c5d886474bd is described below

commit c5d886474bdfb9f6e8f20b1279550f43653f0dc7
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Fri May 6 23:45:35 2022 +0800

    [SPARK-38838][SQL] Support ALTER TABLE ALTER COLUMN commands with DEFAULT 
values
    
    ### What changes were proposed in this pull request?
    
    Support ALTER TABLE ALTER COLUMN commands with DEFAULT values.
    
    For example:
    
    ```
    CREATE TABLE t(i BOOLEAN, s STRING) USING PARQUET;
    ALTER TABLE t ALTER COLUMN s SET DEFAULT CONCAT('abc', 'def');
    INSERT INTO t VALUES(TRUE, DEFAULT);
    SELECT s FROM t WHERE i = TRUE;
    > "abcdef"
    ```
    
    ### Why are the changes needed?
    
    This reduces work for users to write INSERT INTO commands into tables by 
allowing them to leave out some of the column values in reasonable ways.
    
    It also helps table owners add columns with values to auto-generate useful 
information into the table contents, such as date or time values.
    
    Does this PR introduce any user-facing change?
    Yes, it allows ALTER TABLE ALTER COLUMN commands to specify optional 
DEFAULT values for the columns. These default values are stored inside the 
column metadata.
    
    ### How was this patch tested?
    
    Unit test coverage was added as part of this PR.
    
    Closes #36122 from dtenedor/alter-table-alter-column.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  2 +-
 .../sql/catalyst/analysis/CheckAnalysis.scala      |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     | 24 ++++---
 .../plans/logical/v2AlterTableCommands.scala       |  3 +-
 .../org/apache/spark/sql/types/StructField.scala   | 34 ++++++++++
 .../spark/sql/catalyst/parser/DDLParserSuite.scala | 73 ++++++++++++++++------
 .../catalyst/analysis/ResolveSessionCatalog.scala  |  5 +-
 .../apache/spark/sql/execution/command/ddl.scala   | 26 +++++++-
 .../connector/V2CommandsCaseSensitivitySuite.scala |  6 +-
 .../execution/command/PlanResolutionSuite.scala    |  9 ++-
 .../org/apache/spark/sql/sources/InsertSuite.scala | 68 ++++++++++++++++++++
 11 files changed, 215 insertions(+), 37 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 3c8d7585284..906077a9c0e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -3742,7 +3742,7 @@ class Analyzer(override val catalogManager: 
CatalogManager)
         resolved
 
       case a @ AlterColumn(
-          table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, 
_, position) =>
+          table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, 
_, position, _) =>
         val newDataType = dataType.flatMap { dt =>
           // Hive style syntax provides the column type, even if it may not 
have changed.
           val existing = 
CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 51a3facf408..415de7d6504 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1135,7 +1135,7 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
       case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) 
=>
         checkColumnNotExists("rename", col.path :+ newName, table.schema)
 
-      case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, 
_, _) =>
+      case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, 
_, _, _) =>
         val fieldName = col.name.quoted
         if (a.dataType.isDefined) {
           val field = CharVarcharUtils.getRawType(col.field.metadata)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index dca071ab051..448ebcb35b3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -3832,14 +3832,20 @@ class AstBuilder extends 
SqlBaseParserBaseVisitor[AnyRef] with SQLConfHelper wit
     } else {
       None
     }
-    if (action.defaultExpression != null) {
-      throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
-    }
-    if (action.dropDefault != null) {
-      throw QueryParsingErrors.defaultColumnNotImplementedYetError(ctx)
+    val setDefaultExpression: Option[String] =
+      if (action.defaultExpression != null) {
+        Option(action.defaultExpression()).map(visitDefaultExpression)
+      } else if (action.dropDefault != null) {
+        Some("")
+      } else {
+        None
+      }
+    if (setDefaultExpression.isDefined && 
!conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) {
+      throw QueryParsingErrors.defaultColumnNotEnabledError(ctx)
     }
 
-    assert(Seq(dataType, nullable, comment, position).count(_.nonEmpty) == 1)
+    assert(Seq(dataType, nullable, comment, position, setDefaultExpression)
+      .count(_.nonEmpty) == 1)
 
     AlterColumn(
       createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"),
@@ -3847,7 +3853,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
       dataType = dataType,
       nullable = nullable,
       comment = comment,
-      position = position)
+      position = position,
+      setDefaultExpression = setDefaultExpression)
   }
 
   /**
@@ -3882,7 +3889,8 @@ class AstBuilder extends SqlBaseParserBaseVisitor[AnyRef] 
with SQLConfHelper wit
       nullable = None,
       comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec),
       position = Option(ctx.colPosition).map(
-        pos => UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))))
+        pos => UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))),
+      setDefaultExpression = None)
   }
 
   override def visitHiveReplaceColumns(
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index 8cc93c2dd09..830385b564a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -206,7 +206,8 @@ case class AlterColumn(
     dataType: Option[DataType],
     nullable: Option[Boolean],
     comment: Option[String],
-    position: Option[FieldPosition]) extends AlterTableCommand {
+    position: Option[FieldPosition],
+    setDefaultExpression: Option[String]) extends AlterTableCommand {
   override def changes: Seq[TableChange] = {
     require(column.resolved, "FieldName should be resolved before it's 
converted to TableChange.")
     val colName = column.name.toArray
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
index f490f8318ef..c80745ff6b5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -22,6 +22,7 @@ import org.json4s.JsonDSL._
 
 import org.apache.spark.annotation.Stable
 import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, 
quoteIfNeeded}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.catalyst.util.StringUtils.StringConcat
 import org.apache.spark.sql.util.SchemaUtils
 
@@ -84,6 +85,39 @@ case class StructField(
     if (metadata.contains("comment")) Option(metadata.getString("comment")) 
else None
   }
 
+  /**
+   * Updates the StructField with a new current default value.
+   */
+  def withCurrentDefaultValue(value: String): StructField = {
+    val newMetadata = new MetadataBuilder()
+      .withMetadata(metadata)
+      .putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, value)
+      .build()
+    copy(metadata = newMetadata)
+  }
+
+  /**
+   * Clears the StructField of its current default value, if any.
+   */
+  def clearCurrentDefaultValue(): StructField = {
+    val newMetadata = new MetadataBuilder()
+      .withMetadata(metadata)
+      .remove(CURRENT_DEFAULT_COLUMN_METADATA_KEY)
+      .build()
+    copy(metadata = newMetadata)
+  }
+
+  /**
+   * Return the current default value of this StructField.
+   */
+  def getCurrentDefaultValue(): Option[String] = {
+    if (metadata.contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+      Option(metadata.getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY))
+    } else {
+      None
+    }
+  }
+
   private def getDDLComment = getComment()
     .map(escapeSingleQuotedString)
     .map(" COMMENT '" + _ + "'")
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
index af939b0aa69..8dd719d3e1d 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala
@@ -935,6 +935,7 @@ class DDLParserSuite extends AnalysisTest {
         Some(LongType),
         None,
         None,
+        None,
         None))
   }
 
@@ -954,6 +955,7 @@ class DDLParserSuite extends AnalysisTest {
         Some(LongType),
         None,
         None,
+        None,
         None))
   }
 
@@ -966,6 +968,7 @@ class DDLParserSuite extends AnalysisTest {
         None,
         None,
         Some("new comment"),
+        None,
         None))
   }
 
@@ -978,7 +981,8 @@ class DDLParserSuite extends AnalysisTest {
         None,
         None,
         None,
-        Some(UnresolvedFieldPosition(first()))))
+        Some(UnresolvedFieldPosition(first())),
+        None))
   }
 
   test("alter table: multiple property changes are not allowed") {
@@ -1004,6 +1008,7 @@ class DDLParserSuite extends AnalysisTest {
         None,
         Some(false),
         None,
+        None,
         None))
 
     comparePlans(
@@ -1014,6 +1019,7 @@ class DDLParserSuite extends AnalysisTest {
         None,
         Some(true),
         None,
+        None,
         None))
   }
 
@@ -1072,6 +1078,7 @@ class DDLParserSuite extends AnalysisTest {
         Some(IntegerType),
         None,
         None,
+        None,
         None))
 
     comparePlans(
@@ -1082,6 +1089,7 @@ class DDLParserSuite extends AnalysisTest {
         Some(IntegerType),
         None,
         Some("new_comment"),
+        None,
         None))
 
     comparePlans(
@@ -1092,7 +1100,8 @@ class DDLParserSuite extends AnalysisTest {
         Some(IntegerType),
         None,
         None,
-        Some(UnresolvedFieldPosition(after("other_col")))))
+        Some(UnresolvedFieldPosition(after("other_col"))),
+        None))
 
     // renaming column not supported in hive style ALTER COLUMN.
     intercept("ALTER TABLE table_name CHANGE COLUMN a.b.c new_name INT",
@@ -2260,22 +2269,6 @@ class DDLParserSuite extends AnalysisTest {
   }
 
   test("SPARK-38335: Implement parser support for DEFAULT values for columns 
in tables") {
-    comparePlans(
-      parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"),
-      AddColumns(UnresolvedTable(Seq("t1"), "ALTER TABLE ... ADD COLUMN", 
None),
-        Seq(QualifiedColType(None, "x", IntegerType, false, None, None, 
Some("42")))))
-    // The following ALTER TABLE commands will support DEFAULT columns, but 
this has not been
-    // implemented yet.
-    val unsupportedError = "Support for DEFAULT column values is not 
implemented yet"
-    assert(intercept[ParseException] {
-      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42")
-    }.getMessage.contains(unsupportedError))
-    assert(intercept[ParseException] {
-      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT")
-    }.getMessage.contains(unsupportedError))
-    assert(intercept[ParseException] {
-      parsePlan("ALTER TABLE t1 REPLACE COLUMNS (x STRING DEFAULT 42)")
-    }.getMessage.contains(unsupportedError))
     // These CREATE/REPLACE TABLE statements should parse successfully.
     val schemaWithDefaultColumn = new StructType()
       .add("a", IntegerType, true)
@@ -2293,6 +2286,50 @@ class DDLParserSuite extends AnalysisTest {
       ReplaceTable(UnresolvedDBObjectName(Seq("my_tab"), false), 
schemaWithDefaultColumn,
         Seq.empty[Transform], LogicalTableSpec(Map.empty[String, String], 
Some("parquet"),
           Map.empty[String, String], None, None, None, false), false))
+    // THese ALTER TABLE statements should parse successfully.
+    comparePlans(
+      parsePlan("ALTER TABLE t1 ADD COLUMN x int NOT NULL DEFAULT 42"),
+      AddColumns(UnresolvedTable(Seq("t1"), "ALTER TABLE ... ADD COLUMN", 
None),
+        Seq(QualifiedColType(None, "x", IntegerType, false, None, None, 
Some("42")))))
+    comparePlans(
+      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42"),
+      AlterColumn(
+        UnresolvedTable(Seq("t1"), "ALTER TABLE ... ALTER COLUMN", None),
+        UnresolvedFieldName(Seq("a", "b", "c")),
+        None,
+        None,
+        None,
+        None,
+        Some("42")))
+    // It is possible to pass an empty string default value using quotes.
+    comparePlans(
+      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT ''"),
+      AlterColumn(
+        UnresolvedTable(Seq("t1"), "ALTER TABLE ... ALTER COLUMN", None),
+        UnresolvedFieldName(Seq("a", "b", "c")),
+        None,
+        None,
+        None,
+        None,
+        Some("''")))
+    // It is not possible to pass an empty string default value without using 
quotes.
+    // This results in a parsing error.
+    intercept("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT ",
+      "Syntax error at or near end of input")
+    // It is not possible to both SET DEFAULT and DROP DEFAULT at the same 
time.
+    // This results in a parsing error.
+    intercept("ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT SET DEFAULT 42",
+      "Syntax error at or near 'SET'")
+    comparePlans(
+      parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT"),
+      AlterColumn(
+        UnresolvedTable(Seq("t1"), "ALTER TABLE ... ALTER COLUMN", None),
+        UnresolvedFieldName(Seq("a", "b", "c")),
+        None,
+        None,
+        None,
+        None,
+        Some("")))
     // Make sure that the parser returns an exception when the feature is 
disabled.
     withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
       intercept(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 252b6882519..f4df3bea532 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, toPrettySQL, 
ResolveDefaultColumns => DefaultCols}
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
 import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
CatalogV2Util, Identifier, LookupCatalog, SupportsNamespaces, V1Table}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -59,7 +60,7 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
     case ReplaceColumns(ResolvedV1TableIdentifier(_), _) =>
       throw 
QueryCompilationErrors.operationOnlySupportedWithV2TableError("REPLACE COLUMNS")
 
-    case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, 
_, _) =>
+    case a @ AlterColumn(ResolvedV1TableAndIdentifier(table, ident), _, _, _, 
_, _, _) =>
       if (a.column.name.length > 1) {
         throw QueryCompilationErrors
           .operationOnlySupportedWithV2TableError("ALTER COLUMN with qualified 
column")
@@ -83,6 +84,8 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
               quoteIfNeeded(colName), table)
           }
       }
+      // Add the current default column value string (if any) to the column 
metadata.
+      a.setDefaultExpression.map { c => 
builder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY, c) }
       val newColumn = StructField(
         colName,
         dataType,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index e9ec98e6d0a..2dbe5f0e590 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns
 import org.apache.spark.sql.connector.catalog.{CatalogV2Util, TableCatalog}
 import org.apache.spark.sql.connector.catalog.SupportsNamespaces._
 import org.apache.spark.sql.errors.QueryCompilationErrors
@@ -353,7 +354,25 @@ case class AlterTableChangeColumnCommand(
     val newDataSchema = table.dataSchema.fields.map { field =>
       if (field.name == originColumn.name) {
         // Create a new column from the origin column with the new comment.
-        addComment(field, newColumn.getComment)
+        val withNewComment: StructField =
+          addComment(field, newColumn.getComment)
+        // Create a new column from the origin column with the new current 
default value.
+        if (newColumn.getCurrentDefaultValue().isDefined) {
+          if (newColumn.getCurrentDefaultValue().get.nonEmpty) {
+            val result: StructField =
+              addCurrentDefaultValue(withNewComment, 
newColumn.getCurrentDefaultValue())
+            // Check that the proposed default value parses and analyzes 
correctly, and that the
+            // type of the resulting expression is equivalent or coercible to 
the destination column
+            // type.
+            ResolveDefaultColumns.analyze(
+              sparkSession.sessionState.analyzer, result, "ALTER TABLE ALTER 
COLUMN")
+            result
+          } else {
+            withNewComment.clearCurrentDefaultValue()
+          }
+        } else {
+          withNewComment
+        }
       } else {
         field
       }
@@ -376,6 +395,11 @@ case class AlterTableChangeColumnCommand(
   private def addComment(column: StructField, comment: Option[String]): 
StructField =
     comment.map(column.withComment).getOrElse(column)
 
+  // Add the current default value to a column, if default value is empty, 
return the original
+  // column.
+  private def addCurrentDefaultValue(column: StructField, value: 
Option[String]): StructField =
+    value.map(column.withCurrentDefaultValue).getOrElse(column)
+
   // Compare a [[StructField]] to another, return true if they have the same 
column
   // name(by resolver) and dataType.
   private def columnEqual(
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
index 4aedac2b4af..b6a8480814e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala
@@ -316,7 +316,7 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
   test("AlterTable: drop column nullability resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", 
"x")).foreach { ref =>
       alterTableTest(
-        AlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, 
None),
+        AlterColumn(table, UnresolvedFieldName(ref), None, Some(true), None, 
None, None),
         Seq("Missing field " + ref.quoted)
       )
     }
@@ -325,7 +325,7 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
   test("AlterTable: change column type resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", 
"x")).foreach { ref =>
       alterTableTest(
-        AlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, 
None, None),
+        AlterColumn(table, UnresolvedFieldName(ref), Some(StringType), None, 
None, None, None),
         Seq("Missing field " + ref.quoted)
       )
     }
@@ -334,7 +334,7 @@ class V2CommandsCaseSensitivitySuite extends 
SharedSparkSession with AnalysisTes
   test("AlterTable: change column comment resolution") {
     Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", 
"x")).foreach { ref =>
       alterTableTest(
-        AlterColumn(table, UnresolvedFieldName(ref), None, None, 
Some("comment"), None),
+        AlterColumn(table, UnresolvedFieldName(ref), None, None, 
Some("comment"), None, None),
         Seq("Missing field " + ref.quoted)
       )
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
index 6a20ee21294..63d2b9d47f1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala
@@ -1108,6 +1108,7 @@ class PlanResolutionSuite extends AnalysisTest {
                 Some(LongType),
                 None,
                 None,
+                None,
                 None) =>
               assert(column.name == Seq("i"))
             case _ => fail("expect AlterTableAlterColumn")
@@ -1120,6 +1121,7 @@ class PlanResolutionSuite extends AnalysisTest {
                 None,
                 None,
                 Some("new comment"),
+                None,
                 None) =>
               assert(column.name == Seq("i"))
             case _ => fail("expect AlterTableAlterColumn")
@@ -1169,14 +1171,15 @@ class PlanResolutionSuite extends AnalysisTest {
     Seq("v2Table", "testcat.tab").foreach { tblName =>
       parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i int COMMENT 'an 
index'") match {
         case AlterColumn(
-            _: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), 
None) =>
+            _: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), 
None, None) =>
           assert(comment == "an index")
         case _ => fail("expect AlterTableAlterColumn with comment change only")
       }
 
       parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i long COMMENT 
'an index'") match {
         case AlterColumn(
-            _: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, 
Some(comment), None) =>
+            _: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, 
Some(comment), None,
+            None) =>
           assert(comment == "an index")
           assert(dataType == LongType)
         case _ => fail("expect AlterTableAlterColumn with type and comment 
changes")
@@ -1211,7 +1214,7 @@ class PlanResolutionSuite extends AnalysisTest {
       val catalog = if (isSessionCatalog) v2SessionCatalog else testCat
       val tableIdent = if (isSessionCatalog) "v2Table" else "tab"
       parsed match {
-        case AlterColumn(r: ResolvedTable, _, _, _, _, _) =>
+        case AlterColumn(r: ResolvedTable, _, _, _, _, _, _) =>
           assert(r.catalog == catalog)
           assert(r.identifier.name() == tableIdent)
         case Project(_, AsDataSourceV2Relation(r)) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 1312353f537..b312e65154a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -1440,6 +1440,74 @@ class InsertSuite extends DataSourceTest with 
SharedSparkSession {
     }
   }
 
+  test("SPARK-38838 INSERT INTO with defaults set by ALTER TABLE ALTER COLUMN: 
positive tests") {
+    withTable("t") {
+      sql("create table t(i boolean, s string, k bigint) using parquet")
+      // The default value for the DEFAULT keyword is the NULL literal.
+      sql("insert into t values(true, default, default)")
+      // There is a complex expression in the default value.
+      sql("alter table t alter column s set default concat('abc', 'def')")
+      sql("insert into t values(true, default, default)")
+      // The default value parses correctly and the provided value type is 
different but coercible.
+      sql("alter table t alter column k set default 42")
+      sql("insert into t values(true, default, default)")
+      // After dropping the default, inserting more values should add NULLs.
+      sql("alter table t alter column k drop default")
+      sql("insert into t values(true, default, default)")
+      checkAnswer(spark.table("t"),
+        Seq(
+          Row(true, null, null),
+          Row(true, "abcdef", null),
+          Row(true, "abcdef", 42),
+          Row(true, "abcdef", null)
+        ))
+    }
+  }
+
+  test("SPARK-38838 INSERT INTO with defaults set by ALTER TABLE ALTER COLUMN: 
negative tests") {
+    object Errors {
+      val COMMON_SUBSTRING = " has a DEFAULT value"
+      val BAD_SUBQUERY =
+        "cannot evaluate expression CAST(scalarsubquery() AS BIGINT) in inline 
table definition"
+    }
+    val createTable = "create table t(i boolean, s bigint) using parquet"
+    val insertDefaults = "insert into t values (default, default)"
+    withTable("t") {
+      sql(createTable)
+      // The default value fails to analyze.
+      assert(intercept[AnalysisException] {
+        sql("alter table t alter column s set default badvalue")
+      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+      // The default value analyzes to a table not in the catalog.
+      assert(intercept[AnalysisException] {
+        sql("alter table t alter column s set default (select min(x) from 
badtable)")
+      }.getMessage.contains(Errors.COMMON_SUBSTRING))
+      // The default value has an explicit alias. It fails to evaluate when 
inlined into the VALUES
+      // list at the INSERT INTO time.
+      sql("alter table t alter column s set default (select 42 as alias)")
+      assert(intercept[AnalysisException] {
+        sql(insertDefaults)
+      }.getMessage.contains(Errors.BAD_SUBQUERY))
+      // The default value parses but the type is not coercible.
+      assert(intercept[AnalysisException] {
+        sql("alter table t alter column s set default false")
+      }.getMessage.contains("provided a value of incompatible type"))
+      // The default value is disabled per configuration.
+      withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") {
+        assert(intercept[ParseException] {
+          sql("alter table t alter column s set default 41 + 1")
+        }.getMessage.contains("Support for DEFAULT column values is not 
allowed"))
+      }
+    }
+    // Attempting to set a default value for a partitioning column is not 
allowed.
+    withTable("t") {
+      sql("create table t(i boolean, s bigint, q int default 42) using parquet 
partitioned by (i)")
+      assert(intercept[AnalysisException] {
+        sql("alter table t alter column i set default false")
+      }.getMessage.contains("Can't find column `i` given table data columns 
[`s`, `q`]"))
+    }
+  }
+
   test("Stop task set if FileAlreadyExistsException was thrown") {
     Seq(true, false).foreach { fastFail =>
       withSQLConf("fs.file.impl" -> 
classOf[FileExistingTestFileSystem].getName,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to