This is an automated email from the ASF dual-hosted git repository.
yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new c93a1f34a [lake/paimon] Support Altering Lake Table Properties (#2799)
c93a1f34a is described below
commit c93a1f34a99895e472291c480101c4d86875d087
Author: Liebing <[email protected]>
AuthorDate: Mon Mar 9 14:54:59 2026 +0800
[lake/paimon] Support Altering Lake Table Properties (#2799)
---
.../fluss/flink/catalog/FlinkCatalogITCase.java | 10 +---
.../fluss/lake/paimon/utils/PaimonConversions.java | 21 +++++---
.../lake/paimon/LakeEnabledTableCreateITCase.java | 56 ++++++++++++++++++----
.../fluss/server/coordinator/MetadataManager.java | 5 +-
.../server/utils/TableDescriptorValidation.java | 15 +-----
5 files changed, 65 insertions(+), 42 deletions(-)
diff --git
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index f4ffd61de..af1e2eaaa 100644
---
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -286,16 +286,8 @@ abstract class FlinkCatalogITCase {
.hasMessage("The option 'bootstrap.servers' is not supported
to alter yet.");
String unSupportedDml6 =
- "alter table test_alter_table_append_only set
('paimon.file.format' = 'orc')";
- assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml6))
- .rootCause()
- .isInstanceOf(InvalidConfigException.class)
- .hasMessage(
- "Property 'paimon.file.format' is not supported to
alter which is for datalake table.");
-
- String unSupportedDml7 =
"alter table test_alter_table_append_only set
('auto-increment.fields' = 'b')";
- assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml7))
+ assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml6))
.rootCause()
.isInstanceOf(CatalogException.class)
.hasMessage("The option 'auto-increment.fields' is not
supported to alter yet.");
diff --git
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index 3021e35fe..94166e2d0 100644
---
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -122,15 +122,14 @@ public class PaimonConversions {
for (TableChange tableChange : tableChanges) {
if (tableChange instanceof TableChange.SetOption) {
TableChange.SetOption setOption = (TableChange.SetOption)
tableChange;
- schemaChanges.add(
- SchemaChange.setOption(
-
convertFlussPropertyKeyToPaimon(setOption.getKey()),
- setOption.getValue()));
+ String key =
convertFlussPropertyKeyToPaimon(setOption.getKey());
+ validateAlterPaimonOptions(key);
+ schemaChanges.add(SchemaChange.setOption(key,
setOption.getValue()));
} else if (tableChange instanceof TableChange.ResetOption) {
TableChange.ResetOption resetOption =
(TableChange.ResetOption) tableChange;
- schemaChanges.add(
- SchemaChange.removeOption(
-
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
+ String key =
convertFlussPropertyKeyToPaimon(resetOption.getKey());
+ validateAlterPaimonOptions(key);
+ schemaChanges.add(SchemaChange.removeOption(key));
} else if (tableChange instanceof TableChange.AddColumn) {
TableChange.AddColumn addColumn = (TableChange.AddColumn)
tableChange;
@@ -275,6 +274,14 @@ public class PaimonConversions {
});
}
+ private static void validateAlterPaimonOptions(String key) {
+ if (PAIMON_UNSETTABLE_OPTIONS.contains(key)
+ || CoreOptions.IMMUTABLE_OPTIONS.contains(key)) {
+ throw new InvalidConfigException(
+ String.format("The Paimon option %s cannot be changed.",
key));
+ }
+ }
+
private static void setPaimonDefaultProperties(Options options) {
// set partition.legacy-name to false, otherwise paimon will use
toString for all types,
// which will cause inconsistent partition value for the same binary
value
diff --git
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 34bc43dd7..a77037158 100644
---
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -65,9 +65,11 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.stream.Stream;
import static
org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS;
@@ -777,14 +779,52 @@ class LakeEnabledTableCreateITCase {
"c1,c2",
BUCKET_NUM);
- // test alter paimon properties, should throw exception
- tableChanges =
Collections.singletonList(TableChange.set("paimon.bucket", "10"));
- List<TableChange> finalTableChanges = tableChanges;
- assertThatThrownBy(() -> admin.alterTable(tablePath,
finalTableChanges, false).get())
- .cause()
- .isInstanceOf(InvalidConfigException.class)
- .hasMessage(
- "Property 'paimon.bucket' is not supported to alter
which is for datalake table.");
+ // test alter unchangeable paimon properties, should throw exception
+ Set<String> unchangeableProperties = new HashSet<>();
+ unchangeableProperties.addAll(PAIMON_UNSETTABLE_OPTIONS);
+ unchangeableProperties.addAll(CoreOptions.IMMUTABLE_OPTIONS);
+ for (String property : unchangeableProperties) {
+ tableChanges =
+ Collections.singletonList(TableChange.set("paimon." +
property, "value"));
+ List<TableChange> finalTableChanges = tableChanges;
+ assertThatThrownBy(() -> admin.alterTable(tablePath,
finalTableChanges, false).get())
+ .cause()
+ .isInstanceOf(InvalidConfigException.class)
+ .hasMessage(String.format("The Paimon option %s cannot be
changed.", property));
+ }
+
+ // test alter changeable paimon properties, should be ok
+ tableChanges =
+ Arrays.asList(
+
TableChange.set("paimon.partition.timestamp-formatter", "yyyyMMdd"),
+ TableChange.set("paimon.partition.timestamp-pattern",
"$ds"));
+ admin.alterTable(tablePath, tableChanges, false).get();
+ paimonTable = paimonCatalog.getTable(Identifier.create(DATABASE,
tablePath.getTableName()));
+ customProperties.put("paimon.partition.timestamp-formatter",
"yyyyMMdd");
+ customProperties.put("paimon.partition.timestamp-pattern", "$ds");
+ tableDescriptor =
+
tableDescriptor.withProperties(tableDescriptor.getProperties(),
customProperties);
+ verifyPaimonTable(
+ paimonTable,
+ tableDescriptor,
+ RowType.of(
+ new DataType[] {
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.STRING(),
+ // for __bucket, __offset, __timestamp
+ org.apache.paimon.types.DataTypes.INT(),
+ org.apache.paimon.types.DataTypes.BIGINT(),
+
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
+ },
+ new String[] {
+ "c1",
+ "c2",
+ BUCKET_COLUMN_NAME,
+ OFFSET_COLUMN_NAME,
+ TIMESTAMP_COLUMN_NAME
+ }),
+ "c1,c2",
+ BUCKET_NUM);
// test alter table if lake table not exists
paimonCatalog.dropTable(Identifier.create(DATABASE,
tablePath.getTableName()), true);
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index c9537136f..cc3808d87 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -421,10 +421,7 @@ public class MetadataManager {
TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo);
// validate the changes
- validateAlterTableProperties(
- tableInfo,
- tablePropertyChanges.tableKeysToChange(),
- tablePropertyChanges.customKeysToChange());
+ validateAlterTableProperties(tableInfo,
tablePropertyChanges.tableKeysToChange());
TableDescriptor tableDescriptor = tableInfo.toTableDescriptor();
TableDescriptor newDescriptor =
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 0c8e9ef35..d3bd94cd7 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -121,7 +121,7 @@ public class TableDescriptorValidation {
}
public static void validateAlterTableProperties(
- TableInfo currentTable, Set<String> tableKeysToChange, Set<String>
customKeysToChange) {
+ TableInfo currentTable, Set<String> tableKeysToChange) {
TableConfig currentConfig = currentTable.getTableConfig();
tableKeysToChange.forEach(
k -> {
@@ -139,19 +139,6 @@ public class TableDescriptorValidation {
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
}
});
-
- if (currentConfig.isDataLakeEnabled() &&
currentConfig.getDataLakeFormat().isPresent()) {
- String format = currentConfig.getDataLakeFormat().get().toString();
- customKeysToChange.forEach(
- k -> {
- if (k.startsWith(format + ".")) {
- throw new InvalidConfigException(
- String.format(
- "Property '%s' is not supported to
alter which is for datalake table.",
- k));
- }
- });
- }
}
private static void checkSystemColumns(RowType schema) {