This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new c93a1f34a [lake/paimon] Support Altering Lake Table Properties (#2799)
c93a1f34a is described below

commit c93a1f34a99895e472291c480101c4d86875d087
Author: Liebing <[email protected]>
AuthorDate: Mon Mar 9 14:54:59 2026 +0800

    [lake/paimon] Support Altering Lake Table Properties (#2799)
---
 .../fluss/flink/catalog/FlinkCatalogITCase.java    | 10 +---
 .../fluss/lake/paimon/utils/PaimonConversions.java | 21 +++++---
 .../lake/paimon/LakeEnabledTableCreateITCase.java  | 56 ++++++++++++++++++----
 .../fluss/server/coordinator/MetadataManager.java  |  5 +-
 .../server/utils/TableDescriptorValidation.java    | 15 +-----
 5 files changed, 65 insertions(+), 42 deletions(-)

diff --git 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
index f4ffd61de..af1e2eaaa 100644
--- 
a/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
+++ 
b/fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/catalog/FlinkCatalogITCase.java
@@ -286,16 +286,8 @@ abstract class FlinkCatalogITCase {
                 .hasMessage("The option 'bootstrap.servers' is not supported 
to alter yet.");
 
         String unSupportedDml6 =
-                "alter table test_alter_table_append_only set 
('paimon.file.format' = 'orc')";
-        assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml6))
-                .rootCause()
-                .isInstanceOf(InvalidConfigException.class)
-                .hasMessage(
-                        "Property 'paimon.file.format' is not supported to 
alter which is for datalake table.");
-
-        String unSupportedDml7 =
                 "alter table test_alter_table_append_only set 
('auto-increment.fields' = 'b')";
-        assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml7))
+        assertThatThrownBy(() -> tEnv.executeSql(unSupportedDml6))
                 .rootCause()
                 .isInstanceOf(CatalogException.class)
                 .hasMessage("The option 'auto-increment.fields' is not 
supported to alter yet.");
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
index 3021e35fe..94166e2d0 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java
@@ -122,15 +122,14 @@ public class PaimonConversions {
         for (TableChange tableChange : tableChanges) {
             if (tableChange instanceof TableChange.SetOption) {
                 TableChange.SetOption setOption = (TableChange.SetOption) 
tableChange;
-                schemaChanges.add(
-                        SchemaChange.setOption(
-                                
convertFlussPropertyKeyToPaimon(setOption.getKey()),
-                                setOption.getValue()));
+                String key = 
convertFlussPropertyKeyToPaimon(setOption.getKey());
+                validateAlterPaimonOptions(key);
+                schemaChanges.add(SchemaChange.setOption(key, 
setOption.getValue()));
             } else if (tableChange instanceof TableChange.ResetOption) {
                 TableChange.ResetOption resetOption = 
(TableChange.ResetOption) tableChange;
-                schemaChanges.add(
-                        SchemaChange.removeOption(
-                                
convertFlussPropertyKeyToPaimon(resetOption.getKey())));
+                String key = 
convertFlussPropertyKeyToPaimon(resetOption.getKey());
+                validateAlterPaimonOptions(key);
+                schemaChanges.add(SchemaChange.removeOption(key));
             } else if (tableChange instanceof TableChange.AddColumn) {
                 TableChange.AddColumn addColumn = (TableChange.AddColumn) 
tableChange;
 
@@ -275,6 +274,14 @@ public class PaimonConversions {
                 });
     }
 
+    private static void validateAlterPaimonOptions(String key) {
+        if (PAIMON_UNSETTABLE_OPTIONS.contains(key)
+                || CoreOptions.IMMUTABLE_OPTIONS.contains(key)) {
+            throw new InvalidConfigException(
+                    String.format("The Paimon option %s cannot be changed.", 
key));
+        }
+    }
+
     private static void setPaimonDefaultProperties(Options options) {
         // set partition.legacy-name to false, otherwise paimon will use 
toString for all types,
         // which will cause inconsistent partition value for the same binary 
value
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 34bc43dd7..a77037158 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -65,9 +65,11 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.stream.Stream;
 
 import static 
org.apache.fluss.lake.paimon.utils.PaimonConversions.PAIMON_UNSETTABLE_OPTIONS;
@@ -777,14 +779,52 @@ class LakeEnabledTableCreateITCase {
                 "c1,c2",
                 BUCKET_NUM);
 
-        // test alter paimon properties, should throw exception
-        tableChanges = 
Collections.singletonList(TableChange.set("paimon.bucket", "10"));
-        List<TableChange> finalTableChanges = tableChanges;
-        assertThatThrownBy(() -> admin.alterTable(tablePath, 
finalTableChanges, false).get())
-                .cause()
-                .isInstanceOf(InvalidConfigException.class)
-                .hasMessage(
-                        "Property 'paimon.bucket' is not supported to alter 
which is for datalake table.");
+        // test alter unchangeable paimon properties, should throw exception
+        Set<String> unchangeableProperties = new HashSet<>();
+        unchangeableProperties.addAll(PAIMON_UNSETTABLE_OPTIONS);
+        unchangeableProperties.addAll(CoreOptions.IMMUTABLE_OPTIONS);
+        for (String property : unchangeableProperties) {
+            tableChanges =
+                    Collections.singletonList(TableChange.set("paimon." + 
property, "value"));
+            List<TableChange> finalTableChanges = tableChanges;
+            assertThatThrownBy(() -> admin.alterTable(tablePath, 
finalTableChanges, false).get())
+                    .cause()
+                    .isInstanceOf(InvalidConfigException.class)
+                    .hasMessage(String.format("The Paimon option %s cannot be 
changed.", property));
+        }
+
+        // test alter changeable paimon properties, should be ok
+        tableChanges =
+                Arrays.asList(
+                        
TableChange.set("paimon.partition.timestamp-formatter", "yyyyMMdd"),
+                        TableChange.set("paimon.partition.timestamp-pattern", 
"$ds"));
+        admin.alterTable(tablePath, tableChanges, false).get();
+        paimonTable = paimonCatalog.getTable(Identifier.create(DATABASE, 
tablePath.getTableName()));
+        customProperties.put("paimon.partition.timestamp-formatter", 
"yyyyMMdd");
+        customProperties.put("paimon.partition.timestamp-pattern", "$ds");
+        tableDescriptor =
+                
tableDescriptor.withProperties(tableDescriptor.getProperties(), 
customProperties);
+        verifyPaimonTable(
+                paimonTable,
+                tableDescriptor,
+                RowType.of(
+                        new DataType[] {
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.STRING(),
+                            // for __bucket, __offset, __timestamp
+                            org.apache.paimon.types.DataTypes.INT(),
+                            org.apache.paimon.types.DataTypes.BIGINT(),
+                            
org.apache.paimon.types.DataTypes.TIMESTAMP_LTZ_MILLIS()
+                        },
+                        new String[] {
+                            "c1",
+                            "c2",
+                            BUCKET_COLUMN_NAME,
+                            OFFSET_COLUMN_NAME,
+                            TIMESTAMP_COLUMN_NAME
+                        }),
+                "c1,c2",
+                BUCKET_NUM);
 
         // test alter table if lake table not exists
         paimonCatalog.dropTable(Identifier.create(DATABASE, 
tablePath.getTableName()), true);
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index c9537136f..cc3808d87 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -421,10 +421,7 @@ public class MetadataManager {
             TableInfo tableInfo = tableReg.toTableInfo(tablePath, schemaInfo);
 
             // validate the changes
-            validateAlterTableProperties(
-                    tableInfo,
-                    tablePropertyChanges.tableKeysToChange(),
-                    tablePropertyChanges.customKeysToChange());
+            validateAlterTableProperties(tableInfo, 
tablePropertyChanges.tableKeysToChange());
 
             TableDescriptor tableDescriptor = tableInfo.toTableDescriptor();
             TableDescriptor newDescriptor =
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 0c8e9ef35..d3bd94cd7 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -121,7 +121,7 @@ public class TableDescriptorValidation {
     }
 
     public static void validateAlterTableProperties(
-            TableInfo currentTable, Set<String> tableKeysToChange, Set<String> 
customKeysToChange) {
+            TableInfo currentTable, Set<String> tableKeysToChange) {
         TableConfig currentConfig = currentTable.getTableConfig();
         tableKeysToChange.forEach(
                 k -> {
@@ -139,19 +139,6 @@ public class TableDescriptorValidation {
                                         
ConfigOptions.TABLE_DATALAKE_ENABLED.key()));
                     }
                 });
-
-        if (currentConfig.isDataLakeEnabled() && 
currentConfig.getDataLakeFormat().isPresent()) {
-            String format = currentConfig.getDataLakeFormat().get().toString();
-            customKeysToChange.forEach(
-                    k -> {
-                        if (k.startsWith(format + ".")) {
-                            throw new InvalidConfigException(
-                                    String.format(
-                                            "Property '%s' is not supported to 
alter which is for datalake table.",
-                                            k));
-                        }
-                    });
-        }
     }
 
     private static void checkSystemColumns(RowType schema) {

Reply via email to