This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3ed55ffc901 [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ALTER COLUMNS to V2 data sources 3ed55ffc901 is described below commit 3ed55ffc901d3f37e91f04e9345b705ffebb249e Author: Daniel Tenedorio <daniel.tenedo...@databricks.com> AuthorDate: Thu Jun 23 18:42:00 2022 -0700 [SPARK-39383][SQL] Support DEFAULT columns in ALTER TABLE ALTER COLUMNS to V2 data sources ### What changes were proposed in this pull request? Extend DEFAULT column support in ALTER TABLE ALTER COLUMNS commands to include V2 data sources. (Note: this depends on https://github.com/apache/spark/pull/36771.) Example: ``` > create or replace table t (a string default 'abc', b string) using $v2Source > insert into t values (default) > alter table t alter column b set default 'def' > insert into t values ("ghi") > Select * from t "abc", null, "ghi", "def" ``` ### Why are the changes needed? This makes V2 data sources easier to use and extend. ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? This PR includes new test coverage. Closes #36778 from dtenedor/default-cols-v2-altercol. Authored-by: Daniel Tenedorio <daniel.tenedo...@databricks.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../spark/sql/connector/catalog/TableChange.java | 55 ++++++++++++++++++++++ .../plans/logical/v2AlterTableCommands.scala | 5 +- .../sql/connector/catalog/CatalogV2Util.scala | 12 +++++ .../spark/sql/connector/AlterTableTests.scala | 19 +++++++- 4 files changed, 89 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java index b0b686942c6..cf735ed9452 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/TableChange.java @@ -220,6 +220,21 @@ public interface TableChange { return new UpdateColumnPosition(fieldNames, newPosition); } + /** + * Create a TableChange for updating the default value of a field. + * <p> + * The name is used to find the field to update. + * <p> + * If the field does not exist, the change will result in an {@link IllegalArgumentException}. + * + * @param fieldNames field names of the column to update + * @param newDefaultValue the new default value + * @return a TableChange for the update + */ + static TableChange updateColumnDefaultValue(String[] fieldNames, String newDefaultValue) { + return new UpdateColumnDefaultValue(fieldNames, newDefaultValue); + } + /** * Create a TableChange for deleting a field. * <p> @@ -655,6 +670,46 @@ public interface TableChange { } } + /** + * A TableChange to update the default value of a field. + * <p> + * The field names are used to find the field to update. + * <p> + * If the field does not exist, the change must result in an {@link IllegalArgumentException}. + */ + final class UpdateColumnDefaultValue implements ColumnChange { + private final String[] fieldNames; + private final String newDefaultValue; + + private UpdateColumnDefaultValue(String[] fieldNames, String newDefaultValue) { + this.fieldNames = fieldNames; + this.newDefaultValue = newDefaultValue; + } + + @Override + public String[] fieldNames() { + return fieldNames; + } + + public String newDefaultValue() { return newDefaultValue; } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + UpdateColumnDefaultValue that = (UpdateColumnDefaultValue) o; + return Arrays.equals(fieldNames, that.fieldNames) && + newDefaultValue.equals(that.newDefaultValue()); + } + + @Override + public int hashCode() { + int result = Objects.hash(newDefaultValue); + result = 31 * result + Arrays.hashCode(fieldNames); + return result; + } + } + /** * A TableChange to delete a field. * <p> diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index e72ce8d421b..94f2a570663 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -227,7 +227,10 @@ case class AlterColumn( "FieldPosition should be resolved before it's converted to TableChange.") TableChange.updateColumnPosition(colName, newPosition.position) } - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange + val defaultValueChange = setDefaultExpression.map { newDefaultExpression => + TableChange.updateColumnDefaultValue(colName, newDefaultExpression) + } + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++ defaultValueChange } override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala index d47ffcc2a71..a5087d39e98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogV2Util.scala @@ -203,6 +203,18 @@ private[sql] object CatalogV2Util { }) } + case update: UpdateColumnDefaultValue => + replace(schema, update.fieldNames, field => + // The new DEFAULT value string will be non-empty for any DDL commands that set the + // default value, such as "ALTER TABLE t ALTER COLUMN c SET DEFAULT ..." (this is + // enforced by the parser). On the other hand, commands that drop the default value such + // as "ALTER TABLE t ALTER COLUMN c DROP DEFAULT" will set this string to empty. + if (update.newDefaultValue().nonEmpty) { + Some(field.withCurrentDefaultValue(update.newDefaultValue())) + } else { + Some(field.clearCurrentDefaultValue) + }) + case delete: DeleteColumn => replace(schema, delete.fieldNames, _ => None, delete.ifExists) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index df6bfdc319f..c43c5636974 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -311,7 +311,7 @@ trait AlterTableTests extends SharedSparkSession { } } - test("SPARK-39383 DEFAULT columns on V2 data sources with ALTER TABLE ADD COLUMN") { + test("SPARK-39383 DEFAULT columns on V2 data sources with ALTER TABLE ADD/ALTER COLUMN") { withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, ") { val t = s"${catalogAndNamespace}table_name" withTable("t") { @@ -327,6 +327,23 @@ trait AlterTableTests extends SharedSparkSession { .add(StructField("b", IntegerType) .withCurrentDefaultValue("2 + 3") .withExistenceDefaultValue("5"))) + + sql(s"alter table $t alter column b set default 2 + 3") + + assert( + getTableMetadata(tableName).schema === new StructType() + .add("a", StringType) + .add(StructField("b", IntegerType) + .withCurrentDefaultValue("2 + 3") + .withExistenceDefaultValue("5"))) + + sql(s"alter table $t alter column b drop default") + + assert( + getTableMetadata(tableName).schema === new StructType() + .add("a", StringType) + .add(StructField("b", IntegerType) + .withExistenceDefaultValue("5"))) } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org