This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 079b78a3e [lake] Allow enabling lakehouse for log tables without 
bucket key created before cluster-level lakehouse is enabled (#2973)
079b78a3e is described below

commit 079b78a3e6176e048abe77b70bc1a2d69178c1a8
Author: yuxia Luo <[email protected]>
AuthorDate: Thu Apr 2 09:50:38 2026 +0800

    [lake] Allow enabling lakehouse for log tables without bucket key created 
before cluster-level lakehouse is enabled (#2973)
---
 .../fluss/client/table/LakeEnableTableITCase.java  | 59 ++++++++++++++++++++++
 .../fluss/server/coordinator/MetadataManager.java  | 21 ++++++++
 .../server/utils/TableDescriptorValidation.java    | 16 ++++++
 3 files changed, 96 insertions(+)

diff --git 
a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
 
b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
index f4488774e..1b85a441f 100644
--- 
a/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
+++ 
b/fluss-client/src/test/java/org/apache/fluss/client/table/LakeEnableTableITCase.java
@@ -111,6 +111,65 @@ class LakeEnableTableITCase extends 
ClientToServerITCaseBase {
                         "The following options cannot be altered for tables 
that were created before the Fluss cluster enabled datalake: 
'table.datalake.enabled'.");
     }
 
+    @Test
+    void 
testCanEnableDatalakeForLogTableWithoutBucketKeyCreatedBeforeClusterEnabledDatalake()
+            throws Exception {
+        String databaseName = "test_db";
+        String tableName =
+                
"test_log_table_without_bucket_key_created_before_cluster_enabled_datalake";
+        TablePath tablePath = TablePath.of(databaseName, tableName);
+
+        admin.alterClusterConfigs(
+                        Collections.singletonList(
+                                new AlterConfig(
+                                        DATALAKE_FORMAT.key(), null, 
AlterConfigOpType.SET)))
+                .get();
+        assertThat(
+                        FLUSS_CLUSTER_EXTENSION
+                                .getCoordinatorServer()
+                                .getCoordinatorService()
+                                .getDataLakeFormat())
+                .isEqualTo(null);
+
+        admin.createDatabase(databaseName, DatabaseDescriptor.EMPTY, 
true).get();
+
+        TableDescriptor tableDescriptor =
+                TableDescriptor.builder()
+                        .schema(
+                                Schema.newBuilder()
+                                        .column("c1", DataTypes.INT())
+                                        .column("c2", DataTypes.STRING())
+                                        .build())
+                        .distributedBy(3)
+                        .build();
+        admin.createTable(tablePath, tableDescriptor, false).get();
+
+        // enable cluster with datalake
+        admin.alterClusterConfigs(
+                        Collections.singletonList(
+                                new AlterConfig(
+                                        DATALAKE_FORMAT.key(),
+                                        DataLakeFormat.PAIMON.toString(),
+                                        AlterConfigOpType.SET)))
+                .get();
+        assertThat(
+                        FLUSS_CLUSTER_EXTENSION
+                                .getCoordinatorServer()
+                                .getCoordinatorService()
+                                .getDataLakeFormat())
+                .isEqualTo(DataLakeFormat.PAIMON);
+
+        List<TableChange> enableDatalakeChange =
+                
Collections.singletonList(TableChange.set(TABLE_DATALAKE_ENABLED.key(), 
"true"));
+        // alter table to enable datalake
+        admin.alterTable(tablePath, enableDatalakeChange, false).get();
+
+        TableInfo updatedTableInfo = admin.getTableInfo(tablePath).get();
+        
assertThat(updatedTableInfo.getTableConfig().isDataLakeEnabled()).isTrue();
+        assertThat(updatedTableInfo.getTableConfig().getDataLakeFormat())
+                .contains(DataLakeFormat.PAIMON);
+    }
+
     @Test
     void testTableWithExplicitDatalakeFormatCanEnableDatalake() throws 
Exception {
         String databaseName = "test_db";
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index cc2bb5b70..b1efa0232 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -35,6 +35,7 @@ import 
org.apache.fluss.exception.TableNotPartitionedException;
 import org.apache.fluss.exception.TooManyBucketsException;
 import org.apache.fluss.exception.TooManyPartitionsException;
 import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DatabaseInfo;
 import org.apache.fluss.metadata.DatabaseSummary;
@@ -513,6 +514,26 @@ public class MetadataManager {
                     getUpdatedTableDescriptor(tableDescriptor, 
tablePropertyChanges);
 
             if (newDescriptor != null) {
+                // is to enable datalake for the table
+                if (isDataLakeEnabled(newDescriptor) && 
!isDataLakeEnabled(tableDescriptor)) {
+                    // The table was created before cluster-level datalake was 
enabled.
+                    // Backfill `table.datalake.format` before enabling 
datalake on the table
+                    // so the updated table metadata stays consistent with the 
cluster setting.
+                    if 
(!tableInfo.getTableConfig().getDataLakeFormat().isPresent()) {
+                        DataLakeFormat dataLakeFormat =
+                                lakeCatalogDynamicLoader
+                                        .getLakeCatalogContainer()
+                                        .getDataLakeFormat();
+                        if (dataLakeFormat == null) {
+                            throw new InvalidAlterTableException(
+                                    "Cannot alter table "
+                                            + tablePath
+                                            + " in data lake, because the 
Fluss cluster doesn't enable datalake tables.");
+                        }
+                        newDescriptor = 
newDescriptor.withDataLakeFormat(dataLakeFormat);
+                    }
+                }
+
                 // reuse the same validate logic with the createTable() method
                 validateTableDescriptor(newDescriptor, maxBucketNum);
 
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
index 14e4330d7..5f0d6416c 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/utils/TableDescriptorValidation.java
@@ -143,6 +143,22 @@ public class TableDescriptorValidation {
                             .filter(k -> k.startsWith("table.datalake."))
                             .collect(Collectors.toList());
             if (!datalakeKeys.isEmpty()) {
+                // Allow log tables without bucket keys to enable datalake 
even when
+                // `table.datalake.format` was not recorded at creation time, 
because bucket
+                // distribution does not need to stay aligned with the lake 
format in this case.
+                boolean alterLegacyLogTableWithoutBucketKey =
+                        !currentTable.hasPrimaryKey()
+                                && !currentTable.hasBucketKey()
+                                && datalakeKeys.stream()
+                                        .allMatch(
+                                                k ->
+                                                        k.equals(
+                                                                
ConfigOptions.TABLE_DATALAKE_ENABLED
+                                                                        
.key()));
+                if (alterLegacyLogTableWithoutBucketKey) {
+                    return;
+                }
+
                 throw new InvalidAlterTableException(
                         String.format(
                                 "The following options cannot be altered for 
tables that were"

Reply via email to