This is an automated email from the ASF dual-hosted git repository.

mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new d10252d8a [iceberg] support altering table properties (#2043)
d10252d8a is described below

commit d10252d8a3050e4c884640fdd1c480340b136f3d
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Sat Dec 13 11:06:33 2025 +0100

    [iceberg] support altering table properties (#2043)
    
    * [iceberg] support altering table properties
    
    * prevent altering reserved table properties
---
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |  60 +++++++-
 .../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 169 +++++++++++++++++++++
 2 files changed, 224 insertions(+), 5 deletions(-)

diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 4e90cb700..03afa011c 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -30,14 +30,19 @@ import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.utils.IOUtils;
 
 import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 
@@ -52,10 +57,17 @@ import java.util.Set;
 import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
 import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
 import static org.apache.iceberg.types.Type.TypeID.STRING;
 
 /** An Iceberg implementation of {@link LakeCatalog}. */
 public class IcebergLakeCatalog implements LakeCatalog {
+    @VisibleForTesting
+    static final Set<String> RESERVED_PROPERTIES =
+            Set.of(
+                    TableProperties.MERGE_MODE,
+                    TableProperties.UPDATE_MODE,
+                    TableProperties.DELETE_MODE);
 
     public static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new 
LinkedHashMap<>();
 
@@ -119,8 +131,35 @@ public class IcebergLakeCatalog implements LakeCatalog {
     @Override
     public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
             throws TableNotExistException {
-        throw new UnsupportedOperationException(
-                "Alter table is not supported for Iceberg at the moment");
+        try {
+            Table table = 
icebergCatalog.loadTable(toIcebergTableIdentifier(tablePath));
+            UpdateProperties updateProperties = table.updateProperties();
+            for (TableChange tableChange : tableChanges) {
+                if (tableChange instanceof TableChange.SetOption) {
+                    TableChange.SetOption option = (TableChange.SetOption) 
tableChange;
+                    checkArgument(
+                            !RESERVED_PROPERTIES.contains(option.getKey()),
+                            "Cannot set table property '%s'",
+                            option.getKey());
+                    updateProperties.set(
+                            convertFlussPropertyKeyToIceberg(option.getKey()), 
option.getValue());
+                } else if (tableChange instanceof TableChange.ResetOption) {
+                    TableChange.ResetOption option = (TableChange.ResetOption) 
tableChange;
+                    checkArgument(
+                            !RESERVED_PROPERTIES.contains(option.getKey()),
+                            "Cannot reset table property '%s'",
+                            option.getKey());
+                    
updateProperties.remove(convertFlussPropertyKeyToIceberg(option.getKey()));
+                } else {
+                    throw new UnsupportedOperationException(
+                            "Unsupported table change: " + 
tableChange.getClass());
+                }
+            }
+
+            updateProperties.commit();
+        } catch (NoSuchTableException e) {
+            throw new TableNotExistException("Table " + tablePath + " does not 
exist.");
+        }
     }
 
     private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
@@ -249,6 +288,14 @@ public class IcebergLakeCatalog implements LakeCatalog {
         }
     }
 
+    private static String convertFlussPropertyKeyToIceberg(String key) {
+        if (key.startsWith(ICEBERG_CONF_PREFIX)) {
+            return key.substring(ICEBERG_CONF_PREFIX.length());
+        } else {
+            return FLUSS_CONF_PREFIX + key;
+        }
+    }
+
     private void createDatabase(String databaseName) {
         Namespace namespace = Namespace.of(databaseName);
         if (icebergCatalog instanceof SupportsNamespaces) {
@@ -275,9 +322,12 @@ public class IcebergLakeCatalog implements LakeCatalog {
 
         if (isPkTable) {
             // MOR table properties for streaming workloads
-            icebergProperties.put("write.delete.mode", "merge-on-read");
-            icebergProperties.put("write.update.mode", "merge-on-read");
-            icebergProperties.put("write.merge.mode", "merge-on-read");
+            icebergProperties.put(
+                    TableProperties.DELETE_MODE, 
RowLevelOperationMode.MERGE_ON_READ.modeName());
+            icebergProperties.put(
+                    TableProperties.UPDATE_MODE, 
RowLevelOperationMode.MERGE_ON_READ.modeName());
+            icebergProperties.put(
+                    TableProperties.MERGE_MODE, 
RowLevelOperationMode.MERGE_ON_READ.modeName());
         }
 
         tableDescriptor
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index 37544d971..0625f1506 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -20,16 +20,20 @@ package org.apache.fluss.lake.iceberg;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
 import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
 import org.apache.fluss.types.DataTypes;
 
 import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.SortDirection;
 import org.apache.iceberg.SortField;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.types.Types;
 import org.assertj.core.api.Assertions;
@@ -455,4 +459,169 @@ class IcebergLakeCatalogTest {
                 .hasMessage(
                         "Partition key only support string type for iceberg 
currently. Column `c1` is not string type.");
     }
+
+    @Test
+    void alterTableProperties() {
+        String database = "test_alter_table_db";
+        String tableName = "test_alter_table";
+
+        Schema flussSchema = Schema.newBuilder().column("id", 
DataTypes.BIGINT()).build();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(flussSchema)
+                        .distributedBy(3)
+                        .property("iceberg.commit.retry.num-retries", "5")
+                        .property("table.datalake.freshness", "30s")
+                        .build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+        TestingLakeCatalogContext context = new TestingLakeCatalogContext();
+        flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
+
+        Catalog catalog = flussIcebergCatalog.getIcebergCatalog();
+        assertThat(catalog.loadTable(TableIdentifier.of(database, 
tableName)).properties())
+                .containsEntry("commit.retry.num-retries", "5")
+                .containsEntry("fluss.table.datalake.freshness", "30s")
+                .doesNotContainKeys("iceberg.commit.retry.num-retries", 
"table.datalake.freshness");
+
+        // set new iceberg property
+        flussIcebergCatalog.alterTable(
+                tablePath,
+                List.of(TableChange.set("iceberg.commit.retry.min-wait-ms", 
"1000")),
+                context);
+        assertThat(catalog.loadTable(TableIdentifier.of(database, 
tableName)).properties())
+                .containsEntry("commit.retry.min-wait-ms", "1000")
+                .containsEntry("commit.retry.num-retries", "5")
+                .containsEntry("fluss.table.datalake.freshness", "30s")
+                .doesNotContainKeys(
+                        "iceberg.commit.retry.min-wait-ms",
+                        "iceberg.commit.retry.num-retries",
+                        "table.datalake.freshness");
+
+        // update existing properties
+        flussIcebergCatalog.alterTable(
+                tablePath,
+                List.of(
+                        TableChange.set("iceberg.commit.retry.num-retries", 
"10"),
+                        TableChange.set("table.datalake.freshness", "23s")),
+                context);
+        assertThat(catalog.loadTable(TableIdentifier.of(database, 
tableName)).properties())
+                .containsEntry("commit.retry.min-wait-ms", "1000")
+                .containsEntry("commit.retry.num-retries", "10")
+                .containsEntry("fluss.table.datalake.freshness", "23s")
+                .doesNotContainKeys(
+                        "iceberg.commit.retry.min-wait-ms",
+                        "iceberg.commit.retry.num-retries",
+                        "table.datalake.freshness");
+
+        // remove existing properties
+        flussIcebergCatalog.alterTable(
+                tablePath,
+                List.of(
+                        TableChange.reset("iceberg.commit.retry.min-wait-ms"),
+                        TableChange.reset("table.datalake.freshness")),
+                context);
+        assertThat(catalog.loadTable(TableIdentifier.of(database, 
tableName)).properties())
+                .containsEntry("commit.retry.num-retries", "10")
+                .doesNotContainKeys(
+                        "commit.retry.min-wait-ms",
+                        "iceberg.commit.retry.min-wait-ms",
+                        "table.datalake.freshness",
+                        "fluss.table.datalake.freshness");
+
+        // remove non-existing property
+        flussIcebergCatalog.alterTable(
+                tablePath, 
List.of(TableChange.reset("iceberg.non-existing.property")), context);
+        assertThat(catalog.loadTable(TableIdentifier.of(database, 
tableName)).properties())
+                .containsEntry("commit.retry.num-retries", "10")
+                .doesNotContainKeys(
+                        "non-existing.property",
+                        "iceberg.non-existing.property",
+                        "commit.retry.min-wait-ms",
+                        "iceberg.commit.retry.min-wait-ms",
+                        "table.datalake.freshness",
+                        "fluss.table.datalake.freshness");
+    }
+
+    @Test
+    void alterTablePropertiesWithNonExistingTable() {
+        TestingLakeCatalogContext context = new TestingLakeCatalogContext();
+        // db & table don't exist
+        assertThatThrownBy(
+                        () ->
+                                flussIcebergCatalog.alterTable(
+                                        TablePath.of("non_existing_db", 
"non_existing_table"),
+                                        List.of(
+                                                TableChange.set(
+                                                        
"iceberg.commit.retry.min-wait-ms",
+                                                        "1000")),
+                                        context))
+                .isInstanceOf(TableNotExistException.class)
+                .hasMessage("Table non_existing_db.non_existing_table does not 
exist.");
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(Schema.newBuilder().column("id", 
DataTypes.BIGINT()).build())
+                        .distributedBy(3)
+                        .property("iceberg.commit.retry.num-retries", "5")
+                        .property("table.datalake.freshness", "30s")
+                        .build();
+
+        String database = "test_db";
+        TablePath tablePath = TablePath.of(database, "test_table");
+        flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
+
+        // database exists but table doesn't exist
+        assertThatThrownBy(
+                        () ->
+                                flussIcebergCatalog.alterTable(
+                                        TablePath.of(database, 
"non_existing_table"),
+                                        List.of(
+                                                TableChange.set(
+                                                        
"iceberg.commit.retry.min-wait-ms",
+                                                        "1000")),
+                                        context))
+                .isInstanceOf(TableNotExistException.class)
+                .hasMessage("Table test_db.non_existing_table does not 
exist.");
+    }
+
+    @Test
+    void alterReservedTableProperties() {
+        String database = "test_alter_table_with_reserved_properties_db";
+        String tableName = "test_alter_table_with_reserved_properties";
+
+        Schema flussSchema = Schema.newBuilder().column("id", 
DataTypes.BIGINT()).build();
+
+        TableDescriptor tableDescriptor =
+                
TableDescriptor.builder().schema(flussSchema).distributedBy(3).build();
+
+        TablePath tablePath = TablePath.of(database, tableName);
+        TestingLakeCatalogContext context = new TestingLakeCatalogContext();
+        flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
+
+        for (String property : IcebergLakeCatalog.RESERVED_PROPERTIES) {
+            assertThatThrownBy(
+                            () ->
+                                    flussIcebergCatalog.alterTable(
+                                            tablePath,
+                                            List.of(
+                                                    TableChange.set(
+                                                            property,
+                                                            
RowLevelOperationMode.COPY_ON_WRITE
+                                                                    
.modeName())),
+                                            context))
+                    .isInstanceOf(IllegalArgumentException.class)
+                    .hasMessage("Cannot set table property '%s'", property);
+
+            assertThatThrownBy(
+                            () ->
+                                    flussIcebergCatalog.alterTable(
+                                            tablePath,
+                                            
List.of(TableChange.reset(property)),
+                                            context))
+                    .isInstanceOf(IllegalArgumentException.class)
+                    .hasMessage("Cannot reset table property '%s'", property);
+        }
+    }
 }

Reply via email to