This is an automated email from the ASF dual-hosted git repository.
yuzelin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b7d289873c [core][spark][flink] Introduce default value when writing
(#5754)
b7d289873c is described below
commit b7d289873c02374a7c1dcde16f4ee11205201e17
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jun 19 15:11:41 2025 +0800
[core][spark][flink] Introduce default value when writing (#5754)
---
docs/content/flink/default-value.md | 64 +++++++++++
docs/content/spark/default-value.md | 71 ++++++++++++
docs/static/rest-catalog-open-api.yaml | 15 +++
.../main/java/org/apache/paimon/CoreOptions.java | 16 ---
.../main/java/org/apache/paimon/schema/Schema.java | 21 +++-
.../org/apache/paimon/schema/SchemaChange.java | 60 ++++++++++
.../java/org/apache/paimon/types/DataField.java | 83 ++++++++------
.../apache/paimon/types/DataTypeJsonParser.java | 7 +-
.../org/apache/paimon/types/ReassignFieldId.java | 8 +-
.../main/java/org/apache/paimon/types/RowType.java | 15 ++-
.../org/apache/paimon/casting/DefaultValueRow.java | 46 ++++++++
.../org/apache/paimon/types/DataTypesTest.java | 2 +-
.../paimon/operation/DefaultValueAssigner.java | 30 +++--
.../org/apache/paimon/schema/SchemaManager.java | 28 ++++-
.../apache/paimon/schema/SchemaMergingUtils.java | 9 +-
.../org/apache/paimon/schema/SchemaValidation.java | 64 -----------
.../apache/paimon/table/sink/TableWriteImpl.java | 8 ++
.../paimon/operation/DefaultValueAssignerTest.java | 8 +-
.../paimon/schema/DataTypeJsonParserTest.java | 11 ++
.../paimon/table/PrimaryKeySimpleTableTest.java | 3 +-
.../apache/paimon/table/SchemaEvolutionTest.java | 121 ---------------------
.../source/snapshot/DefaultValueScannerTest.java | 3 +-
.../cdc/mysql/MySqlSyncTableActionITCase.java | 4 +-
.../AlterColumnDefaultValueProcedure.java | 63 +++++++++++
.../services/org.apache.paimon.factories.Factory | 1 +
.../org/apache/paimon/flink/BranchSqlITCase.java | 9 ++
.../apache/paimon/flink/ReadWriteTableITCase.java | 7 +-
.../apache/paimon/format/orc/OrcFileFormat.java | 3 +-
.../connector/catalog/TableCatalogCapability.java | 52 +++++++++
.../connector/catalog/TableCatalogCapability.java | 51 +++++++++
.../java/org/apache/paimon/spark/SparkCatalog.java | 29 ++++-
.../apache/paimon/spark/utils/CatalogUtils.java | 30 +++++
.../org/apache/paimon/spark/SparkTypeUtils.java | 23 +++-
.../org/apache/paimon/spark/SparkWriteITCase.java | 45 +++++++-
34 files changed, 740 insertions(+), 270 deletions(-)
diff --git a/docs/content/flink/default-value.md
b/docs/content/flink/default-value.md
new file mode 100644
index 0000000000..9030d6f14b
--- /dev/null
+++ b/docs/content/flink/default-value.md
@@ -0,0 +1,64 @@
+---
+title: "Default Value"
+weight: 8
+type: docs
+aliases:
+- /flink/default-value.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Default Value
+
+Paimon allows specifying default values for columns. When users write to these
tables without explicitly providing
+values for certain columns, Paimon automatically generates default values for
these columns.
+
+## Create Table
+
+Flink SQL does not have native support for default values, so we can only
create a table without default values:
+
+```sql
+CREATE TABLE my_table (
+ a BIGINT,
+ b STRING,
+ c INT
+);
+```
+
+We support the procedure of modifying column default values in Flink. You can
add default value definitions after
+creating the table:
+
+```sql
+CALL sys.alter_column_default_value('default.my_table', 'b', 'my_value');
+CALL sys.alter_column_default_value('default.my_table', 'c', '5');
+```
+
+## Insert Table
+
+For SQL commands that execute table writes, such as the `INSERT`, `UPDATE`,
and `MERGE` commands, `NULL` value is
+parsed into the default value specified for the corresponding column.
+
+For example:
+
+```sql
+INSERT INTO my_table (a) VALUES (1), (2);
+
+SELECT * FROM my_table;
+-- result: [[1, 5, my_value], [2, 5, my_value]]
+```
diff --git a/docs/content/spark/default-value.md
b/docs/content/spark/default-value.md
new file mode 100644
index 0000000000..ba5423133e
--- /dev/null
+++ b/docs/content/spark/default-value.md
@@ -0,0 +1,71 @@
+---
+title: "Default Value"
+weight: 8
+type: docs
+aliases:
+- /spark/default-value.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Default Value
+
+Paimon allows specifying default values for columns. When users write to these
tables without explicitly providing
+values for certain columns, Paimon automatically generates default values for
these columns.
+
+## Create Table
+
+You can create a table with columns with default values using the following
SQL:
+
+```sql
+CREATE TABLE my_table (
+ a BIGINT,
+ b STRING DEFAULT 'my_value',
+ c INT DEFAULT 5
+);
+```
+
+## Insert Table
+
+For SQL commands that execute table writes, such as the `INSERT`, `UPDATE`,
and `MERGE` commands, the `DEFAULT` keyword
+or `NULL` value is parsed into the default value specified for the
corresponding column.
+
+## Alter Default Value
+
+Paimon supports alter column default value.
+
+For example:
+
+```sql
+CREATE TABLE T (a INT, b INT DEFAULT 2);
+
+INSERT INTO T (a) VALUES (1);
+-- result: [[1, 2]]
+
+ALTER TABLE T ALTER COLUMN b SET DEFAULT 3;
+
+INSERT INTO T (a) VALUES (2);
+-- result: [[1, 2], [2, 3]]
+```
+
+The default value of `'b'` column has been changed to 3 from 2.
+
+## Limitation
+
+Not support alter table add column with default value, for example: `ALTER
TABLE T ADD COLUMN d INT DEFAULT 5;`.
diff --git a/docs/static/rest-catalog-open-api.yaml
b/docs/static/rest-catalog-open-api.yaml
index 4246da1f8e..402a4553b3 100644
--- a/docs/static/rest-catalog-open-api.yaml
+++ b/docs/static/rest-catalog-open-api.yaml
@@ -2233,6 +2233,7 @@ components:
- $ref: '#/components/schemas/RenameColumn'
- $ref: '#/components/schemas/DropColumn'
- $ref: '#/components/schemas/UpdateColumnComment'
+ - $ref: '#/components/schemas/UpdateColumnDefaultValue'
- $ref: '#/components/schemas/UpdateColumnType'
- $ref: '#/components/schemas/UpdateColumnPosition'
- $ref: '#/components/schemas/UpdateColumnNullability'
@@ -2247,6 +2248,7 @@ components:
renameColumn: '#/components/schemas/RenameColumn'
dropColumn: '#/components/schemas/DropColumn'
updateColumnComment: '#/components/schemas/UpdateColumnComment'
+ updateColumnDefaultValue:
'#/components/schemas/UpdateColumnDefaultValue'
updateColumnType: '#/components/schemas/UpdateColumnType'
updateColumnPosition: '#/components/schemas/UpdateColumnPosition'
updateColumnNullability:
'#/components/schemas/UpdateColumnNullability'
@@ -2339,6 +2341,19 @@ components:
type: string
newComment:
type: string
+ UpdateColumnDefaultValue:
+ allOf:
+ - $ref: '#/components/schemas/BaseSchemaChange'
+ properties:
+ action:
+ type: string
+ const: "updateColumnDefaultValue"
+ fieldNames:
+ type: array
+ items:
+ type: string
+ newDefaultValue:
+ type: string
UpdateColumnType:
allOf:
- $ref: '#/components/schemas/BaseSchemaChange'
diff --git a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
index 4c446dc4e5..3ff05d9a7c 100644
--- a/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
@@ -66,8 +66,6 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Core options for paimon. */
public class CoreOptions implements Serializable {
- public static final String DEFAULT_VALUE_SUFFIX = "default-value";
-
public static final String FIELDS_PREFIX = "fields";
public static final String FIELDS_SEPARATOR = ",";
@@ -2589,20 +2587,6 @@ public class CoreOptions implements Serializable {
return options.get(COMMIT_FORCE_CREATE_SNAPSHOT);
}
- public Map<String, String> getFieldDefaultValues() {
- Map<String, String> defaultValues = new HashMap<>();
- String fieldPrefix = FIELDS_PREFIX + ".";
- String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
- for (Map.Entry<String, String> option : options.toMap().entrySet()) {
- String key = option.getKey();
- if (key != null && key.startsWith(fieldPrefix) &&
key.endsWith(defaultValueSuffix)) {
- String fieldName = key.replace(fieldPrefix,
"").replace(defaultValueSuffix, "");
- defaultValues.put(fieldName, option.getValue());
- }
- }
- return defaultValues;
- }
-
public Map<String, String> commitCallbacks() {
return callbacks(COMMIT_CALLBACKS, COMMIT_CALLBACK_PARAM);
}
diff --git a/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
b/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
index 8a679665fb..d4dbbb0117 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/Schema.java
@@ -175,7 +175,8 @@ public class Schema {
field.id(),
field.name(),
field.type().copy(false),
- field.description()));
+ field.description(),
+ field.defaultValue()));
} else {
newFields.add(field);
}
@@ -302,12 +303,28 @@ public class Schema {
* @param description description of the column
*/
public Builder column(String columnName, DataType dataType, @Nullable
String description) {
+ return column(columnName, dataType, description, null);
+ }
+
+ /**
+ * Declares a column that is appended to this schema.
+ *
+ * @param columnName column name
+ * @param dataType data type of the column
+ * @param description description of the column
+ * @param defaultValue default value of the column
+ */
+ public Builder column(
+ String columnName,
+ DataType dataType,
+ @Nullable String description,
+ @Nullable String defaultValue) {
Preconditions.checkNotNull(columnName, "Column name must not be
null.");
Preconditions.checkNotNull(dataType, "Data type must not be
null.");
int id = highestFieldId.incrementAndGet();
DataType reassignDataType = ReassignFieldId.reassign(dataType,
highestFieldId);
- columns.add(new DataField(id, columnName, reassignDataType,
description));
+ columns.add(new DataField(id, columnName, reassignDataType,
description, defaultValue));
return this;
}
diff --git
a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
index 90bc9abc0a..4b68ad105a 100644
--- a/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
+++ b/paimon-api/src/main/java/org/apache/paimon/schema/SchemaChange.java
@@ -72,6 +72,9 @@ import java.util.Objects;
@JsonSubTypes.Type(
value = SchemaChange.UpdateColumnComment.class,
name = SchemaChange.Actions.UPDATE_COLUMN_COMMENT_ACTION),
+ @JsonSubTypes.Type(
+ value = SchemaChange.UpdateColumnDefaultValue.class,
+ name = SchemaChange.Actions.UPDATE_COLUMN_DEFAULT_VALUE_ACTION),
@JsonSubTypes.Type(
value = SchemaChange.UpdateColumnPosition.class,
name = SchemaChange.Actions.UPDATE_COLUMN_POSITION_ACTION),
@@ -153,6 +156,10 @@ public interface SchemaChange extends Serializable {
return new UpdateColumnComment(fieldNames, comment);
}
+ static SchemaChange updateColumnDefaultValue(String[] fieldNames, String
defaultValue) {
+ return new UpdateColumnDefaultValue(fieldNames, defaultValue);
+ }
+
static SchemaChange updateColumnPosition(Move move) {
return new UpdateColumnPosition(move);
}
@@ -751,6 +758,58 @@ public interface SchemaChange extends Serializable {
}
}
+ /** A SchemaChange to update the default value. */
+ final class UpdateColumnDefaultValue implements SchemaChange {
+
+ private static final long serialVersionUID = 1L;
+ private static final String FIELD_FILED_NAMES = "fieldNames";
+ private static final String FIELD_NEW_DEFAULT_VALUE =
"newDefaultValue";
+
+ @JsonProperty(FIELD_FILED_NAMES)
+ private final String[] fieldNames;
+
+ @JsonProperty(FIELD_NEW_DEFAULT_VALUE)
+ private final String newDefaultValue;
+
+ @JsonCreator
+ private UpdateColumnDefaultValue(
+ @JsonProperty(FIELD_FILED_NAMES) String[] fieldNames,
+ @JsonProperty(FIELD_NEW_DEFAULT_VALUE) String newDefaultValue)
{
+ this.fieldNames = fieldNames;
+ this.newDefaultValue = newDefaultValue;
+ }
+
+ @JsonGetter(FIELD_FILED_NAMES)
+ public String[] fieldNames() {
+ return fieldNames;
+ }
+
+ @JsonGetter(FIELD_NEW_DEFAULT_VALUE)
+ public String newDefaultValue() {
+ return newDefaultValue;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ UpdateColumnDefaultValue that = (UpdateColumnDefaultValue) o;
+ return Arrays.equals(fieldNames, that.fieldNames)
+ && newDefaultValue.equals(that.newDefaultValue);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Objects.hash(newDefaultValue);
+ result = 31 * result + Objects.hashCode(fieldNames);
+ return result;
+ }
+ }
+
/** Actions for schema changes: identify for schema change. */
class Actions {
public static final String FIELD_ACTION = "action";
@@ -763,6 +822,7 @@ public interface SchemaChange extends Serializable {
public static final String UPDATE_COLUMN_TYPE_ACTION =
"updateColumnType";
public static final String UPDATE_COLUMN_NULLABILITY_ACTION =
"updateColumnNullability";
public static final String UPDATE_COLUMN_COMMENT_ACTION =
"updateColumnComment";
+ public static final String UPDATE_COLUMN_DEFAULT_VALUE_ACTION =
"updateColumnDefaultValue";
public static final String UPDATE_COLUMN_POSITION_ACTION =
"updateColumnPosition";
private Actions() {}
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/DataField.java
b/paimon-api/src/main/java/org/apache/paimon/types/DataField.java
index 209118023b..dcca909d52 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataField.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataField.java
@@ -19,6 +19,7 @@
package org.apache.paimon.types;
import org.apache.paimon.annotation.Public;
+import org.apache.paimon.utils.StringUtils;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -41,27 +42,31 @@ public final class DataField implements Serializable {
private static final long serialVersionUID = 1L;
- public static final String FIELD_FORMAT_WITH_DESCRIPTION = "%s %s '%s'";
-
- public static final String FIELD_FORMAT_NO_DESCRIPTION = "%s %s";
-
private final int id;
-
private final String name;
-
private final DataType type;
-
private final @Nullable String description;
+ private final @Nullable String defaultValue;
public DataField(int id, String name, DataType dataType) {
- this(id, name, dataType, null);
+ this(id, name, dataType, null, null);
}
- public DataField(int id, String name, DataType type, @Nullable String
description) {
+ public DataField(int id, String name, DataType dataType, @Nullable String
description) {
+ this(id, name, dataType, description, null);
+ }
+
+ public DataField(
+ int id,
+ String name,
+ DataType type,
+ @Nullable String description,
+ @Nullable String defaultValue) {
this.id = id;
this.name = name;
this.type = type;
this.description = description;
+ this.defaultValue = defaultValue;
}
public int id() {
@@ -76,20 +81,24 @@ public final class DataField implements Serializable {
return type;
}
- public DataField newId(int newid) {
- return new DataField(newid, name, type, description);
+ public DataField newId(int newId) {
+ return new DataField(newId, name, type, description, defaultValue);
}
public DataField newName(String newName) {
- return new DataField(id, newName, type, description);
+ return new DataField(id, newName, type, description, defaultValue);
}
public DataField newType(DataType newType) {
- return new DataField(id, name, newType, description);
+ return new DataField(id, name, newType, description, defaultValue);
}
public DataField newDescription(String newDescription) {
- return new DataField(id, name, type, newDescription);
+ return new DataField(id, name, type, defaultValue, newDescription);
+ }
+
+ public DataField newDefaultValue(String newDefaultValue) {
+ return new DataField(id, name, type, newDefaultValue, description);
}
@Nullable
@@ -97,28 +106,29 @@ public final class DataField implements Serializable {
return description;
}
+ @Nullable
+ public String defaultValue() {
+ return defaultValue;
+ }
+
public DataField copy() {
- return new DataField(id, name, type.copy(), description);
+ return new DataField(id, name, type.copy(), description, defaultValue);
}
public DataField copy(boolean isNullable) {
- return new DataField(id, name, type.copy(isNullable), description);
+ return new DataField(id, name, type.copy(isNullable), description,
defaultValue);
}
public String asSQLString() {
- return formatString(type.asSQLString());
- }
-
- private String formatString(String typeString) {
- if (description == null) {
- return String.format(FIELD_FORMAT_NO_DESCRIPTION,
escapeIdentifier(name), typeString);
- } else {
- return String.format(
- FIELD_FORMAT_WITH_DESCRIPTION,
- escapeIdentifier(name),
- typeString,
- escapeSingleQuotes(description));
+ StringBuilder sb = new StringBuilder();
+ sb.append(escapeIdentifier(name)).append("
").append(type.asSQLString());
+ if (StringUtils.isNotEmpty(description)) {
+ sb.append(" COMMENT
'").append(escapeSingleQuotes(description)).append("'");
}
+ if (defaultValue != null) {
+ sb.append(" DEFAULT ").append(defaultValue);
+ }
+ return sb.toString();
}
public void serializeJson(JsonGenerator generator) throws IOException {
@@ -130,6 +140,9 @@ public final class DataField implements Serializable {
if (description() != null) {
generator.writeStringField("description", description());
}
+ if (defaultValue() != null) {
+ generator.writeStringField("defaultValue", defaultValue());
+ }
generator.writeEndObject();
}
@@ -145,7 +158,8 @@ public final class DataField implements Serializable {
return Objects.equals(id, field.id)
&& Objects.equals(name, field.name)
&& Objects.equals(type, field.type)
- && Objects.equals(description, field.description);
+ && Objects.equals(description, field.description)
+ && Objects.equals(defaultValue, field.defaultValue);
}
public boolean equalsIgnoreFieldId(DataField other) {
@@ -157,7 +171,8 @@ public final class DataField implements Serializable {
}
return Objects.equals(name, other.name)
&& type.equalsIgnoreFieldId(other.type)
- && Objects.equals(description, other.description);
+ && Objects.equals(description, other.description)
+ && Objects.equals(defaultValue, other.defaultValue);
}
public boolean isPrunedFrom(DataField other) {
@@ -170,12 +185,13 @@ public final class DataField implements Serializable {
return Objects.equals(id, other.id)
&& Objects.equals(name, other.name)
&& type.isPrunedFrom(other.type)
- && Objects.equals(description, other.description);
+ && Objects.equals(description, other.description)
+ && Objects.equals(defaultValue, other.defaultValue);
}
@Override
public int hashCode() {
- return Objects.hash(id, name, type, description);
+ return Objects.hash(id, name, type, description, defaultValue);
}
@Override
@@ -193,7 +209,8 @@ public final class DataField implements Serializable {
} else if (dataField1 != null && dataField2 != null) {
return Objects.equals(dataField1.name(), dataField2.name())
&& Objects.equals(dataField1.type(), dataField2.type())
- && Objects.equals(dataField1.description(),
dataField2.description());
+ && Objects.equals(dataField1.description(),
dataField2.description())
+ && Objects.equals(dataField1.defaultValue(),
dataField2.defaultValue());
} else {
return false;
}
diff --git
a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
index 40790f06fb..d8e6063dc1 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/DataTypeJsonParser.java
@@ -58,7 +58,12 @@ public final class DataTypeJsonParser {
if (descriptionNode != null) {
description = descriptionNode.asText();
}
- return new DataField(id, name, type, description);
+ JsonNode defaultValueNode = json.get("defaultValue");
+ String defaultValue = null;
+ if (defaultValueNode != null) {
+ defaultValue = defaultValueNode.asText();
+ }
+ return new DataField(id, name, type, description, defaultValue);
}
public static DataType parseDataType(JsonNode json) {
diff --git
a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
index 36e4d86070..2aacfeaf88 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/ReassignFieldId.java
@@ -56,7 +56,13 @@ public class ReassignFieldId extends
DataTypeDefaultVisitor<DataType> {
public DataType visit(RowType rowType) {
RowType.Builder builder = RowType.builder(rowType.isNullable(),
fieldId);
rowType.getFields()
- .forEach(f -> builder.field(f.name(), f.type().accept(this),
f.description()));
+ .forEach(
+ f ->
+ builder.field(
+ f.name(),
+ f.type().accept(this),
+ f.description(),
+ f.defaultValue()));
return builder.build();
}
diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index 5aa58fb375..e3fbf0bd8b 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -27,6 +27,8 @@ import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCre
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.paimon.shade.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -387,11 +389,22 @@ public final class RowType extends DataType {
return this;
}
- public Builder field(String name, DataType type, String description) {
+ public Builder field(String name, DataType type, @Nullable String
description) {
fields.add(new DataField(fieldId.incrementAndGet(), name, type,
description));
return this;
}
+ public Builder field(
+ String name,
+ DataType type,
+ @Nullable String description,
+ @Nullable String defaultValue) {
+ fields.add(
+ new DataField(
+ fieldId.incrementAndGet(), name, type,
description, defaultValue));
+ return this;
+ }
+
public Builder fields(List<DataType> types) {
for (int i = 0; i < types.size(); i++) {
field("f" + i, types.get(i));
diff --git
a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
index 99199c0a79..4e62bf47ed 100644
--- a/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
+++ b/paimon-common/src/main/java/org/apache/paimon/casting/DefaultValueRow.java
@@ -20,12 +20,20 @@ package org.apache.paimon.casting;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Decimal;
+import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.Timestamp;
import org.apache.paimon.data.variant.Variant;
+import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarCharType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
/**
* An implementation of {@link InternalRow} which provides a default value for
the underlying {@link
@@ -193,4 +201,42 @@ public class DefaultValueRow implements InternalRow {
public static DefaultValueRow from(InternalRow defaultValueRow) {
return new DefaultValueRow(defaultValueRow);
}
+
+ @Nullable
+ public static DefaultValueRow create(RowType rowType) {
+ List<DataField> fields = rowType.getFields();
+ GenericRow row = new GenericRow(fields.size());
+ boolean containsDefaultValue = false;
+ for (int i = 0; i < fields.size(); i++) {
+ DataField dataField = fields.get(i);
+ String defaultValueStr = dataField.defaultValue();
+ if (defaultValueStr == null) {
+ continue;
+ }
+
+ containsDefaultValue = true;
+ @SuppressWarnings("unchecked")
+ CastExecutor<Object, Object> resolve =
+ (CastExecutor<Object, Object>)
+ CastExecutors.resolve(VarCharType.STRING_TYPE,
dataField.type());
+
+ if (resolve == null) {
+ throw new RuntimeException(
+ "Default value do not support the type of " +
dataField.type());
+ }
+
+ if (defaultValueStr.startsWith("'") &&
defaultValueStr.endsWith("'")) {
+ defaultValueStr = defaultValueStr.substring(1,
defaultValueStr.length() - 1);
+ }
+
+ Object defaultValue =
resolve.cast(BinaryString.fromString(defaultValueStr));
+ row.setField(i, defaultValue);
+ }
+
+ if (!containsDefaultValue) {
+ return null;
+ }
+
+ return DefaultValueRow.from(row);
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java
b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java
index 9da6289d29..df4cf0679b 100644
--- a/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/types/DataTypesTest.java
@@ -178,7 +178,7 @@ public class DataTypesTest {
new DataField(1, "b`", new
TimestampType()))))
.satisfies(
baseAssertions(
- "ROW<`a` VARCHAR(1) 'Someone''s desc.', `b```
TIMESTAMP(6)>",
+ "ROW<`a` VARCHAR(1) COMMENT 'Someone''s
desc.', `b``` TIMESTAMP(6)>",
new RowType(
Arrays.asList(
new DataField(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
index ccb65eb9da..416237b60b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/DefaultValueAssigner.java
@@ -18,7 +18,6 @@
package org.apache.paimon.operation;
-import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
@@ -38,16 +37,24 @@ import org.apache.paimon.types.VarCharType;
import javax.annotation.Nullable;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
+
/**
* The field Default value assigner. note that invoke of assigning should be
after merge and schema
* evolution.
+ *
+ * @deprecated default value in reading is not recommended
*/
+@Deprecated
public class DefaultValueAssigner {
+ public static final String DEFAULT_VALUE_SUFFIX = "default-value";
+
private final RowType rowType;
private final Map<String, String> defaultValues;
@@ -69,10 +76,6 @@ public class DefaultValueAssigner {
return this;
}
- public boolean needToAssign() {
- return needToAssign;
- }
-
/** assign default value for column which value is null. */
public RecordReader<InternalRow>
assignFieldsDefaultValue(RecordReader<InternalRow> reader) {
if (!needToAssign) {
@@ -152,8 +155,21 @@ public class DefaultValueAssigner {
}
public static DefaultValueAssigner create(TableSchema schema) {
- CoreOptions coreOptions = new CoreOptions(schema.options());
- Map<String, String> defaultValues =
coreOptions.getFieldDefaultValues();
+ Map<String, String> defaultValues =
getFieldDefaultValues(schema.options());
return new DefaultValueAssigner(defaultValues,
schema.logicalRowType());
}
+
+ private static Map<String, String> getFieldDefaultValues(Map<String,
String> options) {
+ Map<String, String> defaultValues = new HashMap<>();
+ String fieldPrefix = FIELDS_PREFIX + ".";
+ String defaultValueSuffix = "." + DEFAULT_VALUE_SUFFIX;
+ for (Map.Entry<String, String> option : options.entrySet()) {
+ String key = option.getKey();
+ if (key != null && key.startsWith(fieldPrefix) &&
key.endsWith(defaultValueSuffix)) {
+ String fieldName = key.replace(fieldPrefix,
"").replace(defaultValueSuffix, "");
+ defaultValues.put(fieldName, option.getValue());
+ }
+ }
+ return defaultValues;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 36a18e26a8..a33d4e4f1f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -31,6 +31,7 @@ import org.apache.paimon.schema.SchemaChange.RemoveOption;
import org.apache.paimon.schema.SchemaChange.RenameColumn;
import org.apache.paimon.schema.SchemaChange.SetOption;
import org.apache.paimon.schema.SchemaChange.UpdateColumnComment;
+import org.apache.paimon.schema.SchemaChange.UpdateColumnDefaultValue;
import org.apache.paimon.schema.SchemaChange.UpdateColumnNullability;
import org.apache.paimon.schema.SchemaChange.UpdateColumnPosition;
import org.apache.paimon.schema.SchemaChange.UpdateColumnType;
@@ -404,7 +405,8 @@ public class SchemaManager implements Serializable {
field.id(),
rename.newName(),
field.type(),
- field.description());
+ field.description(),
+ field.defaultValue());
newFields.set(i, newField);
return;
}
@@ -463,7 +465,8 @@ public class SchemaManager implements Serializable {
targetRootType,
depth,
update.fieldNames().length),
- field.description());
+ field.description(),
+ field.defaultValue());
});
} else if (change instanceof UpdateColumnNullability) {
UpdateColumnNullability update = (UpdateColumnNullability)
change;
@@ -494,7 +497,8 @@ public class SchemaManager implements Serializable {
sourceRootType,
depth,
update.fieldNames().length),
- field.description());
+ field.description(),
+ field.defaultValue());
});
} else if (change instanceof UpdateColumnComment) {
UpdateColumnComment update = (UpdateColumnComment) change;
@@ -506,11 +510,24 @@ public class SchemaManager implements Serializable {
field.id(),
field.name(),
field.type(),
- update.newDescription()));
+ update.newDescription(),
+ field.defaultValue()));
} else if (change instanceof UpdateColumnPosition) {
UpdateColumnPosition update = (UpdateColumnPosition) change;
SchemaChange.Move move = update.move();
applyMove(newFields, move);
+ } else if (change instanceof UpdateColumnDefaultValue) {
+ UpdateColumnDefaultValue update = (UpdateColumnDefaultValue)
change;
+ updateNestedColumn(
+ newFields,
+ update.fieldNames(),
+ (field, depth) ->
+ new DataField(
+ field.id(),
+ field.name(),
+ field.type(),
+ field.description(),
+ update.newDefaultValue()));
} else {
throw new UnsupportedOperationException("Unsupported change: "
+ change.getClass());
}
@@ -793,7 +810,8 @@ public class SchemaManager implements Serializable {
field.id(),
field.name(),
wrapNewRowType(field.type(), nestedFields),
- field.description()));
+ field.description(),
+ field.defaultValue()));
return;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
index 0fff27dce4..82c61adf6c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaMergingUtils.java
@@ -119,7 +119,8 @@ public class SchemaMergingUtils {
baseField.id(),
baseField.name(),
updatedDataType,
- baseField.description());
+ baseField.description(),
+ baseField.defaultValue());
} else {
return baseField;
}
@@ -226,6 +227,10 @@ public class SchemaMergingUtils {
private static DataField assignIdForNewField(DataField field,
AtomicInteger highestFieldId) {
DataType dataType = ReassignFieldId.reassign(field.type(),
highestFieldId);
return new DataField(
- highestFieldId.incrementAndGet(), field.name(), dataType,
field.description());
+ highestFieldId.incrementAndGet(),
+ field.name(),
+ dataType,
+ field.description(),
+ field.defaultValue());
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 66b648f890..cf6b919499 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -21,9 +21,6 @@ package org.apache.paimon.schema;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
import org.apache.paimon.CoreOptions.MergeEngine;
-import org.apache.paimon.casting.CastExecutor;
-import org.apache.paimon.casting.CastExecutors;
-import org.apache.paimon.data.BinaryString;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
@@ -38,7 +35,6 @@ import org.apache.paimon.types.MapType;
import org.apache.paimon.types.MultisetType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
-import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.StringUtils;
import java.util.ArrayList;
@@ -105,8 +101,6 @@ public class SchemaValidation {
validateBucket(schema, options);
- validateDefaultValues(schema);
-
validateStartupMode(options);
validateFieldsPrefix(schema, options);
@@ -486,64 +480,6 @@ public class SchemaValidation {
}
}
- private static void validateDefaultValues(TableSchema schema) {
- CoreOptions coreOptions = new CoreOptions(schema.options());
- Map<String, String> defaultValues =
coreOptions.getFieldDefaultValues();
-
- if (!defaultValues.isEmpty()) {
-
- List<String> partitionKeys = schema.partitionKeys();
- for (String partitionKey : partitionKeys) {
- if (defaultValues.containsKey(partitionKey)) {
- throw new IllegalArgumentException(
- String.format(
- "Partition key %s should not be assign
default column.",
- partitionKey));
- }
- }
-
- List<String> primaryKeys = schema.primaryKeys();
- for (String primaryKey : primaryKeys) {
- if (defaultValues.containsKey(primaryKey)) {
- throw new IllegalArgumentException(
- String.format(
- "Primary key %s should not be assign
default column.",
- primaryKey));
- }
- }
-
- List<DataField> fields = schema.fields();
-
- for (DataField field : fields) {
- String defaultValueStr = defaultValues.get(field.name());
- if (defaultValueStr == null) {
- continue;
- }
-
- @SuppressWarnings("unchecked")
- CastExecutor<Object, Object> resolve =
- (CastExecutor<Object, Object>)
- CastExecutors.resolve(VarCharType.STRING_TYPE,
field.type());
- if (resolve == null) {
- throw new IllegalArgumentException(
- String.format(
- "The column %s with datatype %s is
currently not supported for default value.",
- field.name(), field.type().asSQLString()));
- }
-
- try {
- resolve.cast(BinaryString.fromString(defaultValueStr));
- } catch (Exception e) {
- throw new IllegalArgumentException(
- String.format(
- "The default value %s of the column %s can
not be cast to datatype: %s",
- defaultValueStr, field.name(),
field.type()),
- e);
- }
- }
- }
- }
-
private static void validateForDeletionVectors(CoreOptions options) {
checkArgument(
options.changelogProducer() == ChangelogProducer.NONE
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
index f72cd7151f..cf3f8d6657 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableWriteImpl.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.sink;
import org.apache.paimon.FileStore;
import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.casting.DefaultValueRow;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
@@ -63,6 +64,7 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
private BucketMode bucketMode;
private final int[] notNullFieldIndex;
+ private final @Nullable DefaultValueRow defaultValueRow;
public TableWriteImpl(
RowType rowType,
@@ -84,6 +86,7 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
.map(DataField::name)
.collect(Collectors.toList());
this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
+ this.defaultValueRow = DefaultValueRow.create(rowType);
}
@Override
@@ -162,6 +165,7 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
@Nullable
public SinkRecord writeAndReturn(InternalRow row) throws Exception {
checkNullability(row);
+ row = wrapDefaultValue(row);
RowKind rowKind = RowKindGenerator.getRowKind(rowKindGenerator, row);
if (ignoreDelete && rowKind.isRetract()) {
return null;
@@ -193,6 +197,10 @@ public class TableWriteImpl<T> implements InnerTableWrite,
Restorable<List<State
}
}
+ private InternalRow wrapDefaultValue(InternalRow row) {
+ return defaultValueRow == null ? row : defaultValueRow.replaceRow(row);
+ }
+
private SinkRecord toSinkRecord(InternalRow row) {
keyAndBucketExtractor.setRecord(row);
return new SinkRecord(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
index 0554cc0134..7cb31fba27 100644
---
a/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/operation/DefaultValueAssignerTest.java
@@ -57,12 +57,16 @@ class DefaultValueAssignerTest {
options.put(
String.format(
"%s.%s.%s",
- CoreOptions.FIELDS_PREFIX, "col4",
CoreOptions.DEFAULT_VALUE_SUFFIX),
+ CoreOptions.FIELDS_PREFIX,
+ "col4",
+ DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
"0");
options.put(
String.format(
"%s.%s.%s",
- CoreOptions.FIELDS_PREFIX, "col5",
CoreOptions.DEFAULT_VALUE_SUFFIX),
+ CoreOptions.FIELDS_PREFIX,
+ "col5",
+ DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
"1");
Schema schema =
new Schema(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java
index 2397af83aa..808ce70aee 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/DataTypeJsonParserTest.java
@@ -168,6 +168,17 @@ public class DataTypeJsonParserTest {
"f1",
new BooleanType(),
"This as well.")))),
+ TestSpec.forString(
+
"{\"type\":\"ROW\",\"fields\":[{\"id\":0,\"name\":\"f0\",\"type\":\"INT NOT
NULL\",\"description\":\"my_comment\",\"defaultValue\":\"55\"}]}")
+ .expectType(
+ new RowType(
+ Collections.singletonList(
+ new DataField(
+ 0,
+ "f0",
+ new IntType(false),
+ "my_comment",
+ "55")))),
// error message testing
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index 35928e2335..693c378a21 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -36,6 +36,7 @@ import org.apache.paimon.manifest.FileKind;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.operation.AbstractFileStoreWrite;
+import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.operation.FileStoreScan;
import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
@@ -1291,7 +1292,7 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
"%s.%s.%s",
CoreOptions.FIELDS_PREFIX,
"b",
- CoreOptions.DEFAULT_VALUE_SUFFIX),
+
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
"0");
});
StreamTableWrite write = table.newWrite(commitUser);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
index 76a63eb06e..5d20e46ec2 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/SchemaEvolutionTest.java
@@ -56,7 +56,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;
@@ -84,126 +83,6 @@ public class SchemaEvolutionTest {
commitUser = UUID.randomUUID().toString();
}
- @Test
- public void testDefaultValue() throws Exception {
- {
- Map<String, String> option = new HashMap<>();
- option.put(
- String.format(
- "%s.%s.%s",
- CoreOptions.FIELDS_PREFIX, "a",
CoreOptions.DEFAULT_VALUE_SUFFIX),
- "1");
- Schema schema =
- new Schema(
- RowType.of(
- new DataType[] {
- DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()),
- DataTypes.BIGINT()
- },
- new String[] {"a", "b"})
- .getFields(),
- Collections.emptyList(),
- Collections.emptyList(),
- option,
- "");
-
- assertThatThrownBy(() -> schemaManager.createTable(schema))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "The column %s with datatype %s is currently not
supported for default value.",
- "a", DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()).asSQLString());
- }
-
- {
- Map<String, String> option = new HashMap<>();
- option.put(
- String.format(
- "%s.%s.%s",
- CoreOptions.FIELDS_PREFIX, "a",
CoreOptions.DEFAULT_VALUE_SUFFIX),
- "abcxxxx");
- Schema schema =
- new Schema(
- RowType.of(
- new DataType[]
{DataTypes.BIGINT(), DataTypes.BIGINT()},
- new String[] {"a", "b"})
- .getFields(),
- Collections.emptyList(),
- Collections.emptyList(),
- option,
- "");
- assertThatThrownBy(() -> schemaManager.createTable(schema))
- .isInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "The default value %s of the column a can not be
cast to datatype: %s",
- "abcxxxx", DataTypes.BIGINT().asSQLString());
- }
-
- {
- Schema schema =
- new Schema(
- RowType.of(
- new DataType[] {
- DataTypes.BIGINT(),
- DataTypes.BIGINT(),
- DataTypes.BIGINT()
- },
- new String[] {"a", "b", "c"})
- .getFields(),
- Lists.newArrayList("c"),
- Lists.newArrayList("a", "c"),
- new HashMap<>(),
- "");
-
- schemaManager.createTable(schema);
-
- assertThatThrownBy(
- () ->
- schemaManager.commitChanges(
- Collections.singletonList(
- SchemaChange.setOption(
- String.format(
- "%s.%s.%s",
-
CoreOptions.FIELDS_PREFIX,
- "b",
- CoreOptions
-
.DEFAULT_VALUE_SUFFIX),
- "abcxxxx"))))
- .hasCauseInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining(
- "The default value %s of the column b can not be
cast to datatype: %s",
- "abcxxxx", DataTypes.BIGINT().asSQLString());
- assertThatThrownBy(
- () ->
- schemaManager.commitChanges(
- Collections.singletonList(
- SchemaChange.setOption(
- String.format(
- "%s.%s.%s",
-
CoreOptions.FIELDS_PREFIX,
- "a",
- CoreOptions
-
.DEFAULT_VALUE_SUFFIX),
- "abc"))))
- .hasCauseInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Primary key a should not be assign
default column.");
-
- assertThatThrownBy(
- () ->
- schemaManager.commitChanges(
- Collections.singletonList(
- SchemaChange.setOption(
- String.format(
- "%s.%s.%s",
-
CoreOptions.FIELDS_PREFIX,
- "c",
- CoreOptions
-
.DEFAULT_VALUE_SUFFIX),
- "abc"))))
- .hasCauseInstanceOf(IllegalArgumentException.class)
- .hasMessageContaining("Partition key c should not be
assign default column.");
- }
- }
-
@Test
public void testAddField() throws Exception {
Schema schema =
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
index a99d6b234e..ec253143a7 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/DefaultValueScannerTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.table.source.snapshot;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -84,7 +85,7 @@ public class DefaultValueScannerTest extends ScannerTestBase {
options.set(
String.format(
"%s.%s.%s",
- CoreOptions.FIELDS_PREFIX, "b",
CoreOptions.DEFAULT_VALUE_SUFFIX),
+ CoreOptions.FIELDS_PREFIX, "b",
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX),
"100");
return createFileStoreTable(options);
}
diff --git
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
index 6f16609372..171b489d42 100644
---
a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
+++
b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.java
@@ -750,8 +750,8 @@ public class MySqlSyncTableActionITCase extends
MySqlActionITCaseBase {
anyCauseMatches(
IllegalArgumentException.class,
"Column v1 have different types when merging
schemas.\n"
- + "Current table
'{paimon_sync_table.incompatible_field_1}' field: `v1` TIMESTAMP(0) ''\n"
- + "To be merged table
'paimon_sync_table.incompatible_field_2' field: `v1` INT ''"));
+ + "Current table
'{paimon_sync_table.incompatible_field_1}' field: `v1` TIMESTAMP(0)\n"
+ + "To be merged table
'paimon_sync_table.incompatible_field_2' field: `v1` INT"));
}
@Test
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java
new file mode 100644
index 0000000000..b85dfd28e5
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/AlterColumnDefaultValueProcedure.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Alter column default value procedure. Usage:
+ *
+ * <pre><code>
+ * CALL sys.alter_column_default_value('table_identifier', 'column',
'default_value')
+ * </code></pre>
+ */
+public class AlterColumnDefaultValueProcedure extends ProcedureBase {
+
+ @Override
+ public String identifier() {
+ return "alter_column_default_value";
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "column", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "default_value", type =
@DataTypeHint("STRING"))
+ })
+ public String[] call(
+ ProcedureContext procedureContext, String table, String column,
String defaultValue)
+ throws Catalog.ColumnAlreadyExistException,
Catalog.TableNotExistException,
+ Catalog.ColumnNotExistException {
+ Identifier identifier = Identifier.fromString(table);
+ String[] fieldNames = StringUtils.split(column, ".");
+ SchemaChange schemaChange =
SchemaChange.updateColumnDefaultValue(fieldNames, defaultValue);
+ catalog.alterTable(identifier, ImmutableList.of(schemaChange), false);
+ return new String[] {"Success"};
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index df615478c5..df8b368c0a 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -95,3 +95,4 @@ org.apache.paimon.flink.procedure.AlterViewDialectProcedure
org.apache.paimon.flink.procedure.CreateFunctionProcedure
org.apache.paimon.flink.procedure.DropFunctionProcedure
org.apache.paimon.flink.procedure.AlterFunctionProcedure
+org.apache.paimon.flink.procedure.AlterColumnDefaultValueProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
index 57a6746fea..41313e0e9c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BranchSqlITCase.java
@@ -40,6 +40,15 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** IT cases for table with branches using SQL. */
public class BranchSqlITCase extends CatalogITCaseBase {
+ @Test
+ public void testDefaultValue() throws Exception {
+ sql("CREATE TABLE T (a INT, b INT)");
+ sql("CALL sys.alter_column_default_value('default.T', 'b', '5')");
+ sql("INSERT INTO T (a) VALUES (1), (2)");
+ assertThat(collectResult("SELECT * FROM T"))
+ .containsExactlyInAnyOrder("+I[1, 5]", "+I[2, 5]");
+ }
+
@Test
public void testAlterBranchTable() throws Exception {
sql(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 285b245c4a..676836d15c 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -23,6 +23,7 @@ import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.operation.DefaultValueAssigner;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -1589,7 +1590,8 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
public void testDefaultValueWithoutPrimaryKey() throws Exception {
Map<String, String> options = new HashMap<>();
options.put(
- CoreOptions.FIELDS_PREFIX + ".rate." +
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
+ CoreOptions.FIELDS_PREFIX + ".rate." +
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX,
+ "1000");
String table =
createTable(
@@ -1625,7 +1627,8 @@ public class ReadWriteTableITCase extends
AbstractTestBase {
throws Exception {
Map<String, String> options = new HashMap<>();
options.put(
- CoreOptions.FIELDS_PREFIX + ".rate." +
CoreOptions.DEFAULT_VALUE_SUFFIX, "1000");
+ CoreOptions.FIELDS_PREFIX + ".rate." +
DefaultValueAssigner.DEFAULT_VALUE_SUFFIX,
+ "1000");
options.put(MERGE_ENGINE.key(), mergeEngine.toString());
if (mergeEngine == FIRST_ROW) {
options.put(CHANGELOG_PRODUCER.key(), LOOKUP.toString());
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
index be257aef4b..1cff47d7d0 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcFileFormat.java
@@ -203,7 +203,8 @@ public class OrcFileFormat extends FileFormat {
f.id(),
f.name(),
refineDataType(f.type()),
- f.description()))
+ f.description(),
+ f.defaultValue()))
.collect(Collectors.toList()));
default:
return type;
diff --git
a/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
new file mode 100644
index 0000000000..ce375ff56d
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/main/scala/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+/**
+ * Capabilities that can be provided by a {@link TableCatalog} implementation.
+ */
+public enum TableCatalogCapability {
+
+ /**
+ * Signals that the TableCatalog supports defining generated columns upon
table creation in SQL.
+ * <p>
+ * Without this capability, any create/replace table statements with a
generated column defined
+ * in the table schema will throw an exception during analysis.
+ * <p>
+ * A generated column is defined with syntax: {@code colName colType
GENERATED ALWAYS AS (expr)}
+ * <p>
+ * Generation expression are included in the column definition for APIs like
+ * {@link TableCatalog#createTable}.
+ */
+ SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS,
+
+ /**
+ * Signals that the TableCatalog supports defining column default value as
expression in
+ * CREATE/REPLACE/ALTER TABLE.
+ * <p>
+ * Without this capability, any CREATE/REPLACE/ALTER TABLE statement with a
column default value
+ * defined in the table schema will throw an exception during analysis.
+ * <p>
+ * A column default value is defined with syntax: {@code colName colType
DEFAULT expr}
+ * <p>
+ * Column default value expression is included in the column definition for
APIs like
+ * {@link TableCatalog#createTable}.
+ */
+ SUPPORT_COLUMN_DEFAULT_VALUE
+}
diff --git
a/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
b/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
new file mode 100644
index 0000000000..94fde80128
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/main/java/org/apache/spark/sql/connector/catalog/TableCatalogCapability.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog;
+
+/** Capabilities that can be provided by a {@link TableCatalog}
implementation. */
+public enum TableCatalogCapability {
+
+ /**
+ * Signals that the TableCatalog supports defining generated columns upon
table creation in SQL.
+ *
+ * <p>Without this capability, any create/replace table statements with a
generated column
+ * defined in the table schema will throw an exception during analysis.
+ *
+ * <p>A generated column is defined with syntax: {@code colName colType
GENERATED ALWAYS AS
+ * (expr)}
+ *
+ * <p>Generation expression are included in the column definition for APIs
like {@link
+ * TableCatalog#createTable}.
+ */
+ SUPPORTS_CREATE_TABLE_WITH_GENERATED_COLUMNS,
+
+ /**
+ * Signals that the TableCatalog supports defining column default value as
expression in
+ * CREATE/REPLACE/ALTER TABLE.
+ *
+ * <p>Without this capability, any CREATE/REPLACE/ALTER TABLE statement
with a column default
+ * value defined in the table schema will throw an exception during
analysis.
+ *
+ * <p>A column default value is defined with syntax: {@code colName
colType DEFAULT expr}
+ *
+ * <p>Column default value expression is included in the column definition
for APIs like {@link
+ * TableCatalog#createTable}.
+ */
+ SUPPORT_COLUMN_DEFAULT_VALUE
+}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 695f03d9ca..d910ade8fa 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -36,6 +36,7 @@ import org.apache.paimon.spark.utils.CatalogUtils;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.FormatTableOptions;
import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.TypeUtils;
import org.apache.spark.sql.PaimonSparkSession$;
@@ -50,6 +51,7 @@ import org.apache.spark.sql.connector.catalog.Identifier;
import org.apache.spark.sql.connector.catalog.NamespaceChange;
import org.apache.spark.sql.connector.catalog.SupportsNamespaces;
import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.catalog.TableCatalogCapability;
import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction;
import org.apache.spark.sql.connector.expressions.FieldReference;
@@ -77,18 +79,24 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.TableType.FORMAT_TABLE;
import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
+import static
org.apache.paimon.spark.SparkTypeUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY;
import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
import static
org.apache.paimon.spark.util.OptionUtils.checkRequiredConfigurations;
import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
import static org.apache.paimon.spark.utils.CatalogUtils.checkNamespace;
+import static org.apache.paimon.spark.utils.CatalogUtils.checkNoDefaultValue;
+import static
org.apache.paimon.spark.utils.CatalogUtils.isUpdateColumnDefaultValue;
import static org.apache.paimon.spark.utils.CatalogUtils.removeCatalogName;
import static org.apache.paimon.spark.utils.CatalogUtils.toIdentifier;
+import static
org.apache.paimon.spark.utils.CatalogUtils.toUpdateColumnDefaultValue;
+import static
org.apache.spark.sql.connector.catalog.TableCatalogCapability.SUPPORT_COLUMN_DEFAULT_VALUE;
/** Spark {@link TableCatalog} for paimon. */
public class SparkCatalog extends SparkBaseCatalog
@@ -103,6 +111,10 @@ public class SparkCatalog extends SparkBaseCatalog
private String defaultDatabase;
+ public Set<TableCatalogCapability> capabilities() {
+ return Collections.singleton(SUPPORT_COLUMN_DEFAULT_VALUE);
+ }
+
@Override
public void initialize(String name, CaseInsensitiveStringMap options) {
checkRequiredConfigurations();
@@ -353,6 +365,7 @@ public class SparkCatalog extends SparkBaseCatalog
} else if (change instanceof TableChange.AddColumn) {
TableChange.AddColumn add = (TableChange.AddColumn) change;
SchemaChange.Move move = getMove(add.position(), add.fieldNames());
+ checkNoDefaultValue(add);
return SchemaChange.addColumn(
add.fieldNames(),
toPaimonType(add.dataType()).copy(add.isNullable()),
@@ -379,6 +392,8 @@ public class SparkCatalog extends SparkBaseCatalog
TableChange.UpdateColumnPosition update =
(TableChange.UpdateColumnPosition) change;
SchemaChange.Move move = getMove(update.position(),
update.fieldNames());
return SchemaChange.updateColumnPosition(move);
+ } else if (isUpdateColumnDefaultValue(change)) {
+ return toUpdateColumnDefaultValue(change);
} else {
throw new UnsupportedOperationException(
"Change is not supported: " + change.getClass());
@@ -428,10 +443,16 @@ public class SparkCatalog extends SparkBaseCatalog
.comment(properties.getOrDefault(TableCatalog.PROP_COMMENT, null));
for (StructField field : schema.fields()) {
- schemaBuilder.column(
- field.name(),
- toPaimonType(field.dataType()).copy(field.nullable()),
- field.getComment().getOrElse(() -> null));
+ String name = field.name();
+ DataType type =
toPaimonType(field.dataType()).copy(field.nullable());
+ String comment = field.getComment().getOrElse(() -> null);
+ if
(field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+ String defaultValue =
+
field.metadata().getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY);
+ schemaBuilder.column(name, type, comment, defaultValue);
+ } else {
+ schemaBuilder.column(name, type, comment);
+ }
}
return schemaBuilder.build();
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
index ddda188d18..25882f5e86 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/utils/CatalogUtils.java
@@ -18,6 +18,7 @@
package org.apache.paimon.spark.utils;
+import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DecimalType;
@@ -27,7 +28,9 @@ import org.apache.paimon.types.MultisetType;
import org.apache.spark.sql.catalyst.util.ArrayBasedMapData;
import org.apache.spark.sql.catalyst.util.GenericArrayData;
+import org.apache.spark.sql.connector.catalog.ColumnDefaultValue;
import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableChange;
import org.apache.spark.sql.types.BooleanType;
import org.apache.spark.sql.types.ByteType;
import org.apache.spark.sql.types.DataTypes;
@@ -248,4 +251,31 @@ public class CatalogUtils {
throw new IllegalArgumentException("Unsupported Spark data type: " +
sparkType);
}
+
+ public static void checkNoDefaultValue(TableChange.AddColumn addColumn) {
+ try {
+ ColumnDefaultValue defaultValue = addColumn.defaultValue();
+ if (defaultValue != null) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Cannot add column %s with default value %s.",
+ Arrays.toString(addColumn.fieldNames()),
defaultValue));
+ }
+ } catch (NoClassDefFoundError | NoSuchMethodError ignored) {
+ }
+ }
+
+ public static boolean isUpdateColumnDefaultValue(TableChange tableChange) {
+ try {
+ return tableChange instanceof TableChange.UpdateColumnDefaultValue;
+ } catch (NoClassDefFoundError ignored) {
+ return false;
+ }
+ }
+
+ public static SchemaChange toUpdateColumnDefaultValue(TableChange
tableChange) {
+ TableChange.UpdateColumnDefaultValue update =
+ (TableChange.UpdateColumnDefaultValue) tableChange;
+ return SchemaChange.updateColumnDefaultValue(update.fieldNames(),
update.newDefaultValue());
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
index ae95881621..de14ef4316 100644
---
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/SparkTypeUtils.java
@@ -48,6 +48,7 @@ import org.apache.spark.sql.paimon.shims.SparkShimLoader;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.LongType;
+import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.UserDefinedType;
@@ -62,6 +63,11 @@ public class SparkTypeUtils {
private SparkTypeUtils() {}
+ /**
+ * Copy here from Spark ResolveDefaultColumnsUtils for old Spark versions.
+ */
+ public static final String CURRENT_DEFAULT_COLUMN_METADATA_KEY =
"CURRENT_DEFAULT";
+
public static RowType toPartitionType(Table table) {
int[] projections =
table.rowType().getFieldIndices(table.partitionKeys());
List<DataField> partitionTypes = new ArrayList<>();
@@ -252,9 +258,17 @@ public class SparkTypeUtils {
public DataType visit(RowType rowType) {
List<StructField> fields = new
ArrayList<>(rowType.getFieldCount());
for (DataField field : rowType.getFields()) {
+ MetadataBuilder metadataBuilder = new MetadataBuilder();
+ if (field.defaultValue() != null) {
+
metadataBuilder.putString(CURRENT_DEFAULT_COLUMN_METADATA_KEY,
field.defaultValue());
+ }
StructField structField =
DataTypes.createStructField(
- field.name(), field.type().accept(this),
field.type().isNullable());
+ field.name(),
+ field.type().accept(this),
+ field.type().isNullable(),
+ metadataBuilder.build()
+ );
structField =
Optional.ofNullable(field.description())
.map(structField::withComment)
@@ -329,9 +343,14 @@ public class SparkTypeUtils {
org.apache.paimon.types.DataType fieldType =
fieldResults.get(i).copy(field.nullable());
String comment = field.getComment().getOrElse(() -> null);
+ String defaultValue = null;
+ if
(field.metadata().contains(CURRENT_DEFAULT_COLUMN_METADATA_KEY)) {
+ defaultValue =
+
field.metadata().getString(CURRENT_DEFAULT_COLUMN_METADATA_KEY);
+ }
newFields.add(
new DataField(
- atomicInteger.incrementAndGet(), field.name(),
fieldType, comment));
+ atomicInteger.incrementAndGet(), field.name(),
fieldType, comment, defaultValue));
}
return new RowType(newFields);
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
index fff94ce037..b39e44fc6f 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkWriteITCase.java
@@ -43,6 +43,7 @@ import java.util.List;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for spark writer. */
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@@ -78,7 +79,49 @@ public class SparkWriteITCase {
@AfterEach
public void afterEach() {
- spark.sql("DROP TABLE T");
+ spark.sql("DROP TABLE IF EXISTS T");
+ }
+
+ @Test
+ public void testWriteWithDefaultValue() {
+ spark.sql(
+ "CREATE TABLE T (a INT, b INT DEFAULT 2, c STRING DEFAULT
'my_value') TBLPROPERTIES"
+ + " ('file.format'='avro')");
+
+ // test show create table
+ List<Row> show = spark.sql("SHOW CREATE TABLE T").collectAsList();
+ assertThat(show.toString())
+ .contains("a INT,\n" + " b INT DEFAULT 2,\n" + " c STRING
DEFAULT 'my_value'");
+
+ // test partial write
+ spark.sql("INSERT INTO T (a) VALUES (1), (2)").collectAsList();
+ List<Row> rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[1,2,my_value],
[2,2,my_value]]");
+
+ // test write with DEFAULT
+ spark.sql("INSERT INTO T VALUES (3, DEFAULT,
DEFAULT)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString()).isEqualTo("[[1,2,my_value],
[2,2,my_value], [3,2,my_value]]");
+
+ // test add column with DEFAULT not support
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE T ADD COLUMN d INT
DEFAULT 5"))
+ .hasMessageContaining(
+ "Unsupported table change: Cannot add column [d] with
default value");
+
+ // test alter type to default column
+ spark.sql("ALTER TABLE T ALTER COLUMN b TYPE STRING").collectAsList();
+ spark.sql("INSERT INTO T (a) VALUES (4)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString())
+ .isEqualTo("[[1,2,my_value], [2,2,my_value], [3,2,my_value],
[4,2,my_value]]");
+
+ // test alter default column
+ spark.sql("ALTER TABLE T ALTER COLUMN b SET DEFAULT '3'");
+ spark.sql("INSERT INTO T (a) VALUES (5)").collectAsList();
+ rows = spark.sql("SELECT * FROM T").collectAsList();
+ assertThat(rows.toString())
+ .isEqualTo(
+ "[[1,2,my_value], [2,2,my_value], [3,2,my_value],
[4,2,my_value], [5,3,my_value]]");
}
@Test