This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ed55ffc901 [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE 
ALTER COLUMNS to V2 data sources
3ed55ffc901 is described below

commit 3ed55ffc901d3f37e91f04e9345b705ffebb249e
Author: Daniel Tenedorio <daniel.tenedo...@databricks.com>
AuthorDate: Thu Jun 23 18:42:00 2022 -0700

    [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ALTER COLUMNS to 
V2 data sources
    
    ### What changes were proposed in this pull request?
    
    Extend DEFAULT column support in ALTER TABLE ALTER COLUMNS commands to 
include V2 data sources.
    
    (Note: this depends on https://github.com/apache/spark/pull/36771.)
    
    Example:
    
    ```
    > create or replace table t (a string default 'abc', b string) using 
$v2Source
    > insert into t values (default)
    > alter table t alter column b set default 'def'
    > insert into t values ("ghi")
    > Select * from t
    "abc", null,
    "ghi", "def"
    ```
    
    ### Why are the changes needed?
    
    This makes V2 data sources easier to use and extend.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes.
    
    ### How was this patch tested?
    
    This PR includes new test coverage.
    
    Closes #36778 from dtenedor/default-cols-v2-altercol.
    
    Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/sql/connector/catalog/TableChange.java   | 55 ++++++++++++++++++++++
 .../plans/logical/v2AlterTableCommands.scala       |  5 +-
 .../sql/connector/catalog/CatalogV2Util.scala      | 12 +++++
 .../spark/sql/connector/AlterTableTests.scala      | 19 +++++++-
 4 files changed, 89 insertions(+), 2 deletions(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
index b0b686942c6..cf735ed9452 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java
@@ -220,6 +220,21 @@ public interface TableChange {
     return new UpdateColumnPosition(fieldNames, newPosition);
   }
 
+  /**
+   * Create a TableChange for updating the default value of a field.
+   * <p>
+   * The name is used to find the field to update.
+   * <p>
+   * If the field does not exist, the change will result in an {@link 
IllegalArgumentException}.
+   *
+   * @param fieldNames field names of the column to update
+   * @param newDefaultValue the new default value
+   * @return a TableChange for the update
+   */
+  static TableChange updateColumnDefaultValue(String[] fieldNames, String 
newDefaultValue) {
+    return new UpdateColumnDefaultValue(fieldNames, newDefaultValue);
+  }
+
   /**
    * Create a TableChange for deleting a field.
    * <p>
@@ -655,6 +670,46 @@ public interface TableChange {
     }
   }
 
+  /**
+   * A TableChange to update the default value of a field.
+   * <p>
+   * The field names are used to find the field to update.
+   * <p>
+   * If the field does not exist, the change must result in an {@link 
IllegalArgumentException}.
+   */
+  final class UpdateColumnDefaultValue implements ColumnChange {
+    private final String[] fieldNames;
+    private final String newDefaultValue;
+
+    private UpdateColumnDefaultValue(String[] fieldNames, String 
newDefaultValue) {
+      this.fieldNames = fieldNames;
+      this.newDefaultValue = newDefaultValue;
+    }
+
+    @Override
+    public String[] fieldNames() {
+      return fieldNames;
+    }
+
+    public String newDefaultValue() { return newDefaultValue; }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      UpdateColumnDefaultValue that = (UpdateColumnDefaultValue) o;
+      return Arrays.equals(fieldNames, that.fieldNames) &&
+        newDefaultValue.equals(that.newDefaultValue());
+    }
+
+    @Override
+    public int hashCode() {
+      int result = Objects.hash(newDefaultValue);
+      result = 31 * result + Arrays.hashCode(fieldNames);
+      return result;
+    }
+  }
+
   /**
    * A TableChange to delete a field.
    * <p>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
index e72ce8d421b..94f2a570663 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala
@@ -227,7 +227,10 @@ case class AlterColumn(
         "FieldPosition should be resolved before it's converted to 
TableChange.")
       TableChange.updateColumnPosition(colName, newPosition.position)
     }
-    typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange
+    val defaultValueChange = setDefaultExpression.map { newDefaultExpression =>
+      TableChange.updateColumnDefaultValue(colName, newDefaultExpression)
+    }
+    typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange 
++ defaultValueChange
   }
 
   override protected def withNewChildInternal(newChild: LogicalPlan): 
LogicalPlan =
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
index d47ffcc2a71..a5087d39e98 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala
@@ -203,6 +203,18 @@ private[sql] object CatalogV2Util {
               })
           }
 
+        case update: UpdateColumnDefaultValue =>
+          replace(schema, update.fieldNames, field =>
+            // The new DEFAULT value string will be non-empty for any DDL 
commands that set the
+            // default value, such as "ALTER TABLE t ALTER COLUMN c SET 
DEFAULT ..." (this is
+            // enforced by the parser). On the other hand, commands that drop 
the default value such
+            // as "ALTER TABLE t ALTER COLUMN c DROP DEFAULT" will set this 
string to empty.
+            if (update.newDefaultValue().nonEmpty) {
+              Some(field.withCurrentDefaultValue(update.newDefaultValue()))
+            } else {
+              Some(field.clearCurrentDefaultValue)
+            })
+
         case delete: DeleteColumn =>
           replace(schema, delete.fieldNames, _ => None, delete.ifExists)
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
index df6bfdc319f..c43c5636974 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala
@@ -311,7 +311,7 @@ trait AlterTableTests extends SharedSparkSession {
     }
   }
 
-  test("SPARK-39383 DEFAULT columns on V2 data sources with ALTER TABLE ADD 
COLUMN") {
+  test("SPARK-39383 DEFAULT columns on V2 data sources with ALTER TABLE 
ADD/ALTER COLUMN") {
     withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, 
") {
       val t = s"${catalogAndNamespace}table_name"
       withTable("t") {
@@ -327,6 +327,23 @@ trait AlterTableTests extends SharedSparkSession {
           .add(StructField("b", IntegerType)
             .withCurrentDefaultValue("2 + 3")
             .withExistenceDefaultValue("5")))
+
+        sql(s"alter table $t alter column b set default 2 + 3")
+
+        assert(
+          getTableMetadata(tableName).schema === new StructType()
+            .add("a", StringType)
+            .add(StructField("b", IntegerType)
+              .withCurrentDefaultValue("2 + 3")
+              .withExistenceDefaultValue("5")))
+
+        sql(s"alter table $t alter column b drop default")
+
+        assert(
+          getTableMetadata(tableName).schema === new StructType()
+            .add("a", StringType)
+            .add(StructField("b", IntegerType)
+              .withExistenceDefaultValue("5")))
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to