This is an automated email from the ASF dual-hosted git repository.
mehulbatra pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new d10252d8a [iceberg] support altering table properties (#2043)
d10252d8a is described below
commit d10252d8a3050e4c884640fdd1c480340b136f3d
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Sat Dec 13 11:06:33 2025 +0100
[iceberg] support altering table properties (#2043)
* [iceberg] support altering table properties
* prevent altering reserved table properties
---
.../fluss/lake/iceberg/IcebergLakeCatalog.java | 60 +++++++-
.../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 169 +++++++++++++++++++++
2 files changed, 224 insertions(+), 5 deletions(-)
diff --git
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 4e90cb700..03afa011c 100644
---
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -30,14 +30,19 @@ import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.utils.IOUtils;
import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.UpdateProperties;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
@@ -52,10 +57,17 @@ import java.util.Set;
import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME;
import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME;
+import static org.apache.fluss.utils.Preconditions.checkArgument;
import static org.apache.iceberg.types.Type.TypeID.STRING;
/** An Iceberg implementation of {@link LakeCatalog}. */
public class IcebergLakeCatalog implements LakeCatalog {
+ @VisibleForTesting
+ static final Set<String> RESERVED_PROPERTIES =
+ Set.of(
+ TableProperties.MERGE_MODE,
+ TableProperties.UPDATE_MODE,
+ TableProperties.DELETE_MODE);
public static final LinkedHashMap<String, Type> SYSTEM_COLUMNS = new
LinkedHashMap<>();
@@ -119,8 +131,35 @@ public class IcebergLakeCatalog implements LakeCatalog {
@Override
public void alterTable(TablePath tablePath, List<TableChange>
tableChanges, Context context)
throws TableNotExistException {
- throw new UnsupportedOperationException(
- "Alter table is not supported for Iceberg at the moment");
+ try {
+ Table table =
icebergCatalog.loadTable(toIcebergTableIdentifier(tablePath));
+ UpdateProperties updateProperties = table.updateProperties();
+ for (TableChange tableChange : tableChanges) {
+ if (tableChange instanceof TableChange.SetOption) {
+ TableChange.SetOption option = (TableChange.SetOption)
tableChange;
+ checkArgument(
+ !RESERVED_PROPERTIES.contains(option.getKey()),
+ "Cannot set table property '%s'",
+ option.getKey());
+ updateProperties.set(
+ convertFlussPropertyKeyToIceberg(option.getKey()),
option.getValue());
+ } else if (tableChange instanceof TableChange.ResetOption) {
+ TableChange.ResetOption option = (TableChange.ResetOption)
tableChange;
+ checkArgument(
+ !RESERVED_PROPERTIES.contains(option.getKey()),
+ "Cannot reset table property '%s'",
+ option.getKey());
+
updateProperties.remove(convertFlussPropertyKeyToIceberg(option.getKey()));
+ } else {
+ throw new UnsupportedOperationException(
+ "Unsupported table change: " +
tableChange.getClass());
+ }
+ }
+
+ updateProperties.commit();
+ } catch (NoSuchTableException e) {
+ throw new TableNotExistException("Table " + tablePath + " does not
exist.");
+ }
}
private TableIdentifier toIcebergTableIdentifier(TablePath tablePath) {
@@ -249,6 +288,14 @@ public class IcebergLakeCatalog implements LakeCatalog {
}
}
+ private static String convertFlussPropertyKeyToIceberg(String key) {
+ if (key.startsWith(ICEBERG_CONF_PREFIX)) {
+ return key.substring(ICEBERG_CONF_PREFIX.length());
+ } else {
+ return FLUSS_CONF_PREFIX + key;
+ }
+ }
+
private void createDatabase(String databaseName) {
Namespace namespace = Namespace.of(databaseName);
if (icebergCatalog instanceof SupportsNamespaces) {
@@ -275,9 +322,12 @@ public class IcebergLakeCatalog implements LakeCatalog {
if (isPkTable) {
// MOR table properties for streaming workloads
- icebergProperties.put("write.delete.mode", "merge-on-read");
- icebergProperties.put("write.update.mode", "merge-on-read");
- icebergProperties.put("write.merge.mode", "merge-on-read");
+ icebergProperties.put(
+ TableProperties.DELETE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
+ icebergProperties.put(
+ TableProperties.UPDATE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
+ icebergProperties.put(
+ TableProperties.MERGE_MODE,
RowLevelOperationMode.MERGE_ON_READ.modeName());
}
tableDescriptor
diff --git
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index 37544d971..0625f1506 100644
---
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -20,16 +20,20 @@ package org.apache.fluss.lake.iceberg;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.exception.TableNotExistException;
import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableChange;
import org.apache.fluss.metadata.TableDescriptor;
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.types.DataTypes;
import org.apache.iceberg.PartitionField;
+import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.types.Types;
import org.assertj.core.api.Assertions;
@@ -455,4 +459,169 @@ class IcebergLakeCatalogTest {
.hasMessage(
"Partition key only support string type for iceberg
currently. Column `c1` is not string type.");
}
+
+ @Test
+ void alterTableProperties() {
+ String database = "test_alter_table_db";
+ String tableName = "test_alter_table";
+
+ Schema flussSchema = Schema.newBuilder().column("id",
DataTypes.BIGINT()).build();
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(flussSchema)
+ .distributedBy(3)
+ .property("iceberg.commit.retry.num-retries", "5")
+ .property("table.datalake.freshness", "30s")
+ .build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+ TestingLakeCatalogContext context = new TestingLakeCatalogContext();
+ flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
+
+ Catalog catalog = flussIcebergCatalog.getIcebergCatalog();
+ assertThat(catalog.loadTable(TableIdentifier.of(database,
tableName)).properties())
+ .containsEntry("commit.retry.num-retries", "5")
+ .containsEntry("fluss.table.datalake.freshness", "30s")
+ .doesNotContainKeys("iceberg.commit.retry.num-retries",
"table.datalake.freshness");
+
+ // set new iceberg property
+ flussIcebergCatalog.alterTable(
+ tablePath,
+ List.of(TableChange.set("iceberg.commit.retry.min-wait-ms",
"1000")),
+ context);
+ assertThat(catalog.loadTable(TableIdentifier.of(database,
tableName)).properties())
+ .containsEntry("commit.retry.min-wait-ms", "1000")
+ .containsEntry("commit.retry.num-retries", "5")
+ .containsEntry("fluss.table.datalake.freshness", "30s")
+ .doesNotContainKeys(
+ "iceberg.commit.retry.min-wait-ms",
+ "iceberg.commit.retry.num-retries",
+ "table.datalake.freshness");
+
+ // update existing properties
+ flussIcebergCatalog.alterTable(
+ tablePath,
+ List.of(
+ TableChange.set("iceberg.commit.retry.num-retries",
"10"),
+ TableChange.set("table.datalake.freshness", "23s")),
+ context);
+ assertThat(catalog.loadTable(TableIdentifier.of(database,
tableName)).properties())
+ .containsEntry("commit.retry.min-wait-ms", "1000")
+ .containsEntry("commit.retry.num-retries", "10")
+ .containsEntry("fluss.table.datalake.freshness", "23s")
+ .doesNotContainKeys(
+ "iceberg.commit.retry.min-wait-ms",
+ "iceberg.commit.retry.num-retries",
+ "table.datalake.freshness");
+
+ // remove existing properties
+ flussIcebergCatalog.alterTable(
+ tablePath,
+ List.of(
+ TableChange.reset("iceberg.commit.retry.min-wait-ms"),
+ TableChange.reset("table.datalake.freshness")),
+ context);
+ assertThat(catalog.loadTable(TableIdentifier.of(database,
tableName)).properties())
+ .containsEntry("commit.retry.num-retries", "10")
+ .doesNotContainKeys(
+ "commit.retry.min-wait-ms",
+ "iceberg.commit.retry.min-wait-ms",
+ "table.datalake.freshness",
+ "fluss.table.datalake.freshness");
+
+ // remove non-existing property
+ flussIcebergCatalog.alterTable(
+ tablePath,
List.of(TableChange.reset("iceberg.non-existing.property")), context);
+ assertThat(catalog.loadTable(TableIdentifier.of(database,
tableName)).properties())
+ .containsEntry("commit.retry.num-retries", "10")
+ .doesNotContainKeys(
+ "non-existing.property",
+ "iceberg.non-existing.property",
+ "commit.retry.min-wait-ms",
+ "iceberg.commit.retry.min-wait-ms",
+ "table.datalake.freshness",
+ "fluss.table.datalake.freshness");
+ }
+
+ @Test
+ void alterTablePropertiesWithNonExistingTable() {
+ TestingLakeCatalogContext context = new TestingLakeCatalogContext();
+ // db & table don't exist
+ assertThatThrownBy(
+ () ->
+ flussIcebergCatalog.alterTable(
+ TablePath.of("non_existing_db",
"non_existing_table"),
+ List.of(
+ TableChange.set(
+
"iceberg.commit.retry.min-wait-ms",
+ "1000")),
+ context))
+ .isInstanceOf(TableNotExistException.class)
+ .hasMessage("Table non_existing_db.non_existing_table does not
exist.");
+
+ TableDescriptor tableDescriptor =
+ TableDescriptor.builder()
+ .schema(Schema.newBuilder().column("id",
DataTypes.BIGINT()).build())
+ .distributedBy(3)
+ .property("iceberg.commit.retry.num-retries", "5")
+ .property("table.datalake.freshness", "30s")
+ .build();
+
+ String database = "test_db";
+ TablePath tablePath = TablePath.of(database, "test_table");
+ flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
+
+ // database exists but table doesn't exist
+ assertThatThrownBy(
+ () ->
+ flussIcebergCatalog.alterTable(
+ TablePath.of(database,
"non_existing_table"),
+ List.of(
+ TableChange.set(
+
"iceberg.commit.retry.min-wait-ms",
+ "1000")),
+ context))
+ .isInstanceOf(TableNotExistException.class)
+ .hasMessage("Table test_db.non_existing_table does not
exist.");
+ }
+
+ @Test
+ void alterReservedTableProperties() {
+ String database = "test_alter_table_with_reserved_properties_db";
+ String tableName = "test_alter_table_with_reserved_properties";
+
+ Schema flussSchema = Schema.newBuilder().column("id",
DataTypes.BIGINT()).build();
+
+ TableDescriptor tableDescriptor =
+
TableDescriptor.builder().schema(flussSchema).distributedBy(3).build();
+
+ TablePath tablePath = TablePath.of(database, tableName);
+ TestingLakeCatalogContext context = new TestingLakeCatalogContext();
+ flussIcebergCatalog.createTable(tablePath, tableDescriptor, context);
+
+ for (String property : IcebergLakeCatalog.RESERVED_PROPERTIES) {
+ assertThatThrownBy(
+ () ->
+ flussIcebergCatalog.alterTable(
+ tablePath,
+ List.of(
+ TableChange.set(
+ property,
+
RowLevelOperationMode.COPY_ON_WRITE
+
.modeName())),
+ context))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot set table property '%s'", property);
+
+ assertThatThrownBy(
+ () ->
+ flussIcebergCatalog.alterTable(
+ tablePath,
+
List.of(TableChange.reset(property)),
+ context))
+ .isInstanceOf(IllegalArgumentException.class)
+ .hasMessage("Cannot reset table property '%s'", property);
+ }
+ }
}