This is an automated email from the ASF dual-hosted git repository.

yuxia pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 44f029bfb [lake] LakeCatalog supports multi-tenancy (#1901)
44f029bfb is described below

commit 44f029bfb99ef5b2300e3be6fcac4309ce60ea2b
Author: Liebing <[email protected]>
AuthorDate: Thu Oct 30 20:30:57 2025 +0800

    [lake] LakeCatalog supports multi-tenancy (#1901)
---
 .../apache/fluss/lake/lakestorage/LakeCatalog.java | 24 +++++++++++++--
 .../lake/lakestorage/PluginLakeStorageWrapper.java |  9 +++---
 .../fluss/lake/lakestorage/LakeStorageTest.java    |  5 ++--
 .../lakestorage/TestingLakeCatalogContext.java     | 29 ++++++++++++++++++
 .../fluss/lake/iceberg/IcebergLakeCatalog.java     |  4 +--
 .../fluss/lake/iceberg/IcebergLakeCatalogTest.java | 34 +++++++++++++++++-----
 .../apache/fluss/lake/lance/LanceLakeCatalog.java  |  4 +--
 .../fluss/lake/paimon/PaimonLakeCatalog.java       |  4 +--
 .../fluss/lake/paimon/PaimonLakeCatalogTest.java   | 13 +++++++--
 .../server/coordinator/CoordinatorService.java     | 24 +++++++++++++--
 .../fluss/server/coordinator/MetadataManager.java  | 13 +++++----
 .../lakehouse/TestingPaimonStoragePlugin.java      |  5 ++--
 12 files changed, 134 insertions(+), 34 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
index 2dce05471..4048ca007 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/LakeCatalog.java
@@ -23,6 +23,7 @@ import org.apache.fluss.exception.TableNotExistException;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.security.acl.FlussPrincipal;
 
 import java.util.List;
 
@@ -39,9 +40,10 @@ public interface LakeCatalog extends AutoCloseable {
      *
      * @param tablePath path of the table to be created
      * @param tableDescriptor The descriptor of the table to be created
+     * @param context contextual information needed for create table
      * @throws TableAlreadyExistException if the table already exists
      */
-    void createTable(TablePath tablePath, TableDescriptor tableDescriptor)
+    void createTable(TablePath tablePath, TableDescriptor tableDescriptor, 
Context context)
             throws TableAlreadyExistException;
 
     /**
@@ -49,13 +51,31 @@ public interface LakeCatalog extends AutoCloseable {
      *
      * @param tablePath path of the table to be altered
      * @param tableChanges The changes to be applied to the table
+     * @param context contextual information needed for alter table
      * @throws TableNotExistException if the table not exists
      */
-    void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+    void alterTable(TablePath tablePath, List<TableChange> tableChanges, 
Context context)
             throws TableNotExistException;
 
     @Override
     default void close() throws Exception {
         // default do nothing
     }
+
+    /**
+     * Contextual information for lake catalog methods that modify metadata in 
an external data
+     * lake. It can be used to:
+     *
+     * <ul>
+     *   <li>Access the fluss principal currently accessing the catalog.
+     * </ul>
+     *
+     * @since 0.9
+     */
+    @PublicEvolving
+    interface Context {
+
+        /** Get the fluss principal currently accessing the catalog. */
+        FlussPrincipal getFlussPrincipal();
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
index 9c75d3609..37cd17ad7 100644
--- 
a/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
+++ 
b/fluss-common/src/main/java/org/apache/fluss/lake/lakestorage/PluginLakeStorageWrapper.java
@@ -75,18 +75,19 @@ public class PluginLakeStorageWrapper implements 
LakeStoragePlugin {
         }
 
         @Override
-        public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+        public void createTable(
+                TablePath tablePath, TableDescriptor tableDescriptor, Context 
context)
                 throws TableAlreadyExistException {
             try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(loader)) {
-                inner.createTable(tablePath, tableDescriptor);
+                inner.createTable(tablePath, tableDescriptor, context);
             }
         }
 
         @Override
-        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
                 throws TableNotExistException {
             try (TemporaryClassLoaderContext ignored = 
TemporaryClassLoaderContext.of(loader)) {
-                inner.alterTable(tablePath, tableChanges);
+                inner.alterTable(tablePath, tableChanges, context);
             }
         }
 
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
index 5812cc3ca..178ec37af 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/LakeStorageTest.java
@@ -146,11 +146,12 @@ class LakeStorageTest {
     private static class TestPaimonLakeCatalog implements LakeCatalog {
 
         @Override
-        public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+        public void createTable(
+                TablePath tablePath, TableDescriptor tableDescriptor, Context 
context)
                 throws TableAlreadyExistException {}
 
         @Override
-        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
                 throws TableNotExistException {}
     }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
new file mode 100644
index 000000000..b57ff94cf
--- /dev/null
+++ 
b/fluss-common/src/test/java/org/apache/fluss/lake/lakestorage/TestingLakeCatalogContext.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.lakestorage;
+
+import org.apache.fluss.security.acl.FlussPrincipal;
+
+/** A testing implementation of {@link LakeCatalog.Context}. */
+public class TestingLakeCatalogContext implements LakeCatalog.Context {
+
+    @Override
+    public FlussPrincipal getFlussPrincipal() {
+        return null;
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
index 3b1e4bd0d..4e90cb700 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalog.java
@@ -84,7 +84,7 @@ public class IcebergLakeCatalog implements LakeCatalog {
     }
 
     @Override
-    public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+    public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor, Context context)
             throws TableAlreadyExistException {
         // convert Fluss table path to iceberg table
         boolean isPkTable = tableDescriptor.hasPrimaryKey();
@@ -117,7 +117,7 @@ public class IcebergLakeCatalog implements LakeCatalog {
     }
 
     @Override
-    public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+    public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
             throws TableNotExistException {
         throw new UnsupportedOperationException(
                 "Alter table is not supported for Iceberg at the moment");
diff --git 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
index 725fa4fe5..37544d971 100644
--- 
a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/IcebergLakeCatalogTest.java
@@ -20,6 +20,7 @@ package org.apache.fluss.lake.iceberg;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.exception.InvalidTableException;
+import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableDescriptor;
 import org.apache.fluss.metadata.TablePath;
@@ -86,7 +87,8 @@ class IcebergLakeCatalogTest {
                         .build();
 
         TablePath tablePath = TablePath.of(database, tableName);
-        flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+        flussIcebergCatalog.createTable(
+                tablePath, tableDescriptor, new TestingLakeCatalogContext());
 
         Table created =
                 flussIcebergCatalog
@@ -118,7 +120,8 @@ class IcebergLakeCatalogTest {
 
         TablePath tablePath = TablePath.of(database, tableName);
 
-        flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+        flussIcebergCatalog.createTable(
+                tablePath, tableDescriptor, new TestingLakeCatalogContext());
 
         TableIdentifier tableId = TableIdentifier.of(database, tableName);
         Table createdTable = 
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
@@ -175,7 +178,8 @@ class IcebergLakeCatalogTest {
 
         TablePath tablePath = TablePath.of(database, tableName);
 
-        flussIcebergCatalog.createTable(tablePath, tableDescriptor);
+        flussIcebergCatalog.createTable(
+                tablePath, tableDescriptor, new TestingLakeCatalogContext());
 
         TableIdentifier tableId = TableIdentifier.of(database, tableName);
         Table createdTable = 
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
@@ -254,7 +258,12 @@ class IcebergLakeCatalogTest {
 
         TablePath tablePath = TablePath.of(database, tableName);
 
-        assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, 
tableDescriptor))
+        assertThatThrownBy(
+                        () ->
+                                flussIcebergCatalog.createTable(
+                                        tablePath,
+                                        tableDescriptor,
+                                        new TestingLakeCatalogContext()))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Only one bucket key is supported for 
Iceberg");
     }
@@ -279,7 +288,7 @@ class IcebergLakeCatalogTest {
                         .build();
 
         TablePath tablePath = TablePath.of(database, tableName);
-        flussIcebergCatalog.createTable(tablePath, td);
+        flussIcebergCatalog.createTable(tablePath, td, new 
TestingLakeCatalogContext());
 
         TableIdentifier tableId = TableIdentifier.of(database, tableName);
         Table createdTable = 
flussIcebergCatalog.getIcebergCatalog().loadTable(tableId);
@@ -336,7 +345,7 @@ class IcebergLakeCatalogTest {
                         .build();
 
         TablePath path = TablePath.of(database, tableName);
-        flussIcebergCatalog.createTable(path, td);
+        flussIcebergCatalog.createTable(path, td, new 
TestingLakeCatalogContext());
 
         Table createdTable =
                 flussIcebergCatalog
@@ -401,7 +410,12 @@ class IcebergLakeCatalogTest {
         TablePath tablePath = TablePath.of(database, tableName);
 
         // Do not allow multiple bucket keys for log table
-        assertThatThrownBy(() -> flussIcebergCatalog.createTable(tablePath, 
tableDescriptor))
+        assertThatThrownBy(
+                        () ->
+                                flussIcebergCatalog.createTable(
+                                        tablePath,
+                                        tableDescriptor,
+                                        new TestingLakeCatalogContext()))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessageContaining("Only one bucket key is supported for 
Iceberg");
     }
@@ -432,7 +446,11 @@ class IcebergLakeCatalogTest {
         tableDescriptor.partitionedBy(partitionKeys);
 
         Assertions.assertThatThrownBy(
-                        () -> flussIcebergCatalog.createTable(t1, 
tableDescriptor.build()))
+                        () ->
+                                flussIcebergCatalog.createTable(
+                                        t1,
+                                        tableDescriptor.build(),
+                                        new TestingLakeCatalogContext()))
                 .isInstanceOf(InvalidTableException.class)
                 .hasMessage(
                         "Partition key only support string type for iceberg 
currently. Column `c1` is not string type.");
diff --git 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
index 2a55fc46a..600dcbd0d 100644
--- 
a/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/LanceLakeCatalog.java
@@ -43,7 +43,7 @@ public class LanceLakeCatalog implements LakeCatalog {
     }
 
     @Override
-    public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor) {
+    public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor, Context context) {
         // currently, we don't support primary key table for lance
         if (tableDescriptor.hasPrimaryKey()) {
             throw new InvalidTableException(
@@ -71,7 +71,7 @@ public class LanceLakeCatalog implements LakeCatalog {
     }
 
     @Override
-    public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+    public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
             throws TableNotExistException {
         throw new UnsupportedOperationException(
                 "Alter table is not supported for Lance at the moment");
diff --git 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
index b11d5adf2..22a189208 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java
@@ -77,7 +77,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
     }
 
     @Override
-    public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+    public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor, Context context)
             throws TableAlreadyExistException {
         // then, create the table
         Identifier paimonPath = toPaimon(tablePath);
@@ -102,7 +102,7 @@ public class PaimonLakeCatalog implements LakeCatalog {
     }
 
     @Override
-    public void alterTable(TablePath tablePath, List<TableChange> tableChanges)
+    public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
             throws TableNotExistException {
         try {
             Identifier paimonPath = toPaimon(tablePath);
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
index c2275fe21..a9959d821 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/PaimonLakeCatalogTest.java
@@ -18,6 +18,7 @@
 package org.apache.fluss.lake.paimon;
 
 import org.apache.fluss.config.Configuration;
+import org.apache.fluss.lake.lakestorage.TestingLakeCatalogContext;
 import org.apache.fluss.metadata.Schema;
 import org.apache.fluss.metadata.TableChange;
 import org.apache.fluss.metadata.TableDescriptor;
@@ -62,14 +63,20 @@ class PaimonLakeCatalogTest {
         assertThat(table.options().get("key")).isEqualTo(null);
 
         // set the value for key
-        flussPaimonCatalog.alterTable(tablePath, 
Arrays.asList(TableChange.set("key", "value")));
+        flussPaimonCatalog.alterTable(
+                tablePath,
+                Arrays.asList(TableChange.set("key", "value")),
+                new TestingLakeCatalogContext());
 
         table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
         // we have set the value for key
         assertThat(table.options().get("fluss.key")).isEqualTo("value");
 
         // reset the value for key
-        flussPaimonCatalog.alterTable(tablePath, 
Arrays.asList(TableChange.reset("key")));
+        flussPaimonCatalog.alterTable(
+                tablePath,
+                Arrays.asList(TableChange.reset("key")),
+                new TestingLakeCatalogContext());
 
         table = flussPaimonCatalog.getPaimonCatalog().getTable(identifier);
         // we have reset the value for key
@@ -93,6 +100,6 @@ class PaimonLakeCatalogTest {
 
         TablePath tablePath = TablePath.of(database, tableName);
 
-        flussPaimonCatalog.createTable(tablePath, td);
+        flussPaimonCatalog.createTable(tablePath, td, new 
TestingLakeCatalogContext());
     }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
index 8fe9285bb..d45cc0231 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java
@@ -33,6 +33,7 @@ import org.apache.fluss.exception.SecurityDisabledException;
 import org.apache.fluss.exception.TableAlreadyExistException;
 import org.apache.fluss.exception.TableNotPartitionedException;
 import org.apache.fluss.fs.FileSystem;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
 import org.apache.fluss.metadata.DataLakeFormat;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metadata.DeleteBehavior;
@@ -84,6 +85,7 @@ import org.apache.fluss.rpc.netty.server.Session;
 import org.apache.fluss.rpc.protocol.ApiError;
 import org.apache.fluss.security.acl.AclBinding;
 import org.apache.fluss.security.acl.AclBindingFilter;
+import org.apache.fluss.security.acl.FlussPrincipal;
 import org.apache.fluss.security.acl.OperationType;
 import org.apache.fluss.security.acl.Resource;
 import org.apache.fluss.server.DynamicConfigManager;
@@ -287,7 +289,10 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         if (isDataLakeEnabled(tableDescriptor)) {
             try {
                 checkNotNull(lakeCatalogContainer.getLakeCatalog())
-                        .createTable(tablePath, tableDescriptor);
+                        .createTable(
+                                tablePath,
+                                tableDescriptor,
+                                new 
DefaultLakeCatalogContext(currentSession().getPrincipal()));
             } catch (TableAlreadyExistException e) {
                 throw new LakeTableAlreadyExistException(
                         String.format(
@@ -326,7 +331,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 request.isIgnoreIfNotExists(),
                 lakeCatalogContainer.getLakeCatalog(),
                 lakeCatalogContainer.getDataLakeFormat(),
-                lakeTableTieringManager);
+                lakeTableTieringManager,
+                new 
DefaultLakeCatalogContext(currentSession().getPrincipal()));
 
         return CompletableFuture.completedFuture(new AlterTableResponse());
     }
@@ -757,4 +763,18 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
             }
         }
     }
+
+    static class DefaultLakeCatalogContext implements LakeCatalog.Context {
+
+        private final FlussPrincipal flussPrincipal;
+
+        public DefaultLakeCatalogContext(FlussPrincipal flussPrincipal) {
+            this.flussPrincipal = flussPrincipal;
+        }
+
+        @Override
+        public FlussPrincipal getFlussPrincipal() {
+            return flussPrincipal;
+        }
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
index 5b3b85f62..2001fa789 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/MetadataManager.java
@@ -318,7 +318,8 @@ public class MetadataManager {
             boolean ignoreIfNotExists,
             @Nullable LakeCatalog lakeCatalog,
             @Nullable DataLakeFormat dataLakeFormat,
-            LakeTableTieringManager lakeTableTieringManager) {
+            LakeTableTieringManager lakeTableTieringManager,
+            LakeCatalog.Context lakeCatalogContext) {
         try {
             // it throws TableNotExistException if the table or database not 
exists
             TableRegistration tableReg = getTableRegistration(tablePath);
@@ -349,7 +350,8 @@ public class MetadataManager {
                         newDescriptor,
                         tableChanges,
                         lakeCatalog,
-                        dataLakeFormat);
+                        dataLakeFormat,
+                        lakeCatalogContext);
                 // update the table to zk
                 TableRegistration updatedTableRegistration =
                         tableReg.newProperties(
@@ -388,7 +390,8 @@ public class MetadataManager {
             TableDescriptor newDescriptor,
             List<TableChange> tableChanges,
             LakeCatalog lakeCatalog,
-            DataLakeFormat dataLakeFormat) {
+            DataLakeFormat dataLakeFormat,
+            LakeCatalog.Context lakeCatalogContext) {
         if (isDataLakeEnabled(newDescriptor)) {
             if (lakeCatalog == null) {
                 throw new InvalidAlterTableException(
@@ -402,7 +405,7 @@ public class MetadataManager {
             if (!isDataLakeEnabled(tableDescriptor)) {
                 // before create table in fluss, we may create in lake
                 try {
-                    lakeCatalog.createTable(tablePath, newDescriptor);
+                    lakeCatalog.createTable(tablePath, newDescriptor, 
lakeCatalogContext);
                     // no need to alter lake table if it is newly created
                     isLakeTableNewlyCreated = true;
                 } catch (TableAlreadyExistException e) {
@@ -421,7 +424,7 @@ public class MetadataManager {
             if (!isLakeTableNewlyCreated) {
                 {
                     try {
-                        lakeCatalog.alterTable(tablePath, tableChanges);
+                        lakeCatalog.alterTable(tablePath, tableChanges, 
lakeCatalogContext);
                     } catch (TableNotExistException e) {
                         throw new FlussRuntimeException(
                                 "Lake table doesn't exists for lake-enabled 
table "
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
index deb2cddb0..c726e22e3 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/lakehouse/TestingPaimonStoragePlugin.java
@@ -74,7 +74,8 @@ public class TestingPaimonStoragePlugin implements 
LakeStoragePlugin {
         private final Map<TablePath, TableDescriptor> tableByPath = new 
HashMap<>();
 
         @Override
-        public void createTable(TablePath tablePath, TableDescriptor 
tableDescriptor)
+        public void createTable(
+                TablePath tablePath, TableDescriptor tableDescriptor, Context 
context)
                 throws TableAlreadyExistException {
             if (tableByPath.containsKey(tablePath)) {
                 throw new TableAlreadyExistException("Table " + tablePath + " 
already exists");
@@ -83,7 +84,7 @@ public class TestingPaimonStoragePlugin implements 
LakeStoragePlugin {
         }
 
         @Override
-        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges)
+        public void alterTable(TablePath tablePath, List<TableChange> 
tableChanges, Context context)
                 throws TableNotExistException {
             // do nothing
         }

Reply via email to