This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch dlf-2.5
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit dfe2176291ecbbebc613cf5e73b3bbc19a080bb8
Author: luoyuxia <[email protected]>
AuthorDate: Mon Aug 18 17:45:44 2025 +0800

    support enable lake dynamically
---
 .../com/alibaba/fluss/client/table/FlussTable.java | 18 +++++
 .../java/com/alibaba/fluss/client/table/Table.java | 19 ++++++
 .../fluss/client/utils/ClientRpcMessageUtils.java  | 22 ++++++
 .../com/alibaba/fluss/config/FlussConfigUtils.java |  8 +++
 .../exception/InvalidAlterTableException.java      | 26 ++++++++
 .../alibaba/fluss/metadata/UpdateProperties.java   | 74 ++++++++++++++++++++
 .../lake/paimon/LakeEnabledTableCreateITCase.java  | 32 +++++++++
 .../alibaba/fluss/rpc/gateway/AdminGateway.java    | 10 +++
 .../com/alibaba/fluss/rpc/protocol/ApiKeys.java    |  3 +-
 .../com/alibaba/fluss/rpc/protocol/Errors.java     |  5 +-
 fluss-rpc/src/main/proto/FlussApi.proto            | 13 ++++
 .../server/coordinator/CoordinatorService.java     | 11 +++
 .../fluss/server/coordinator/MetadataManager.java  | 78 ++++++++++++++++++++++
 .../fluss/server/entity/AlterTableData.java        | 42 ++++++++++++
 .../fluss/server/utils/ServerRpcMessageUtils.java  | 17 +++++
 .../fluss/server/zk/data/TableRegistration.java    | 12 ++++
 .../server/coordinator/TestCoordinatorGateway.java |  7 ++
 17 files changed, 395 insertions(+), 2 deletions(-)

diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java
index aa894d55c..a86f82045 100644
--- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java
+++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/FlussTable.java
@@ -29,7 +29,13 @@ import com.alibaba.fluss.client.table.writer.TableUpsert;
 import com.alibaba.fluss.client.table.writer.Upsert;
 import com.alibaba.fluss.metadata.TableInfo;
 import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.metadata.UpdateProperties;
+import com.alibaba.fluss.rpc.GatewayClientProxy;
+import com.alibaba.fluss.rpc.gateway.AdminGateway;
 
+import java.util.concurrent.CompletableFuture;
+
+import static 
com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeAlterTableRequest;
 import static com.alibaba.fluss.utils.Preconditions.checkState;
 
 /**
@@ -86,6 +92,18 @@ public class FlussTable implements Table {
         return new TableUpsert(tablePath, tableInfo, 
conn.getOrCreateWriterClient());
     }
 
+    @Override
+    public CompletableFuture<Void> updateProperties(UpdateProperties 
updateProperties) {
+        AdminGateway adminGateway =
+                GatewayClientProxy.createGatewayProxy(
+                        () -> conn.getMetadataUpdater().getCoordinatorServer(),
+                        conn.getRpcClient(),
+                        AdminGateway.class);
+        return adminGateway
+                .alterTable(makeAlterTableRequest(tablePath, updateProperties))
+                .thenApply(r -> null);
+    }
+
     @Override
     public void close() throws Exception {
         // do nothing
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java 
b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java
index 573a6580d..e1505cb7a 100644
--- a/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java
+++ b/fluss-client/src/main/java/com/alibaba/fluss/client/table/Table.java
@@ -26,7 +26,12 @@ import com.alibaba.fluss.client.table.writer.Append;
 import com.alibaba.fluss.client.table.writer.AppendWriter;
 import com.alibaba.fluss.client.table.writer.Upsert;
 import com.alibaba.fluss.client.table.writer.UpsertWriter;
+import com.alibaba.fluss.exception.InvalidAlterTableException;
+import com.alibaba.fluss.exception.TableNotExistException;
 import com.alibaba.fluss.metadata.TableInfo;
+import com.alibaba.fluss.metadata.UpdateProperties;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Used to communicate with a single Fluss table. Obtain an instance from a 
{@link Connection}.
@@ -72,4 +77,18 @@ public interface Table extends AutoCloseable {
      * this table (requires to be a Primary Key Table).
      */
     Upsert newUpsert();
+
+    /**
+     * Update the table properties with this table.
+     *
+     * <p>The following exceptions can be anticipated when calling {@code 
get()} on returned future.
+     *
+     * <ul>
+     *   <li>{@link TableNotExistException} if the table does not exist.
+     *   <li>{@link InvalidAlterTableException} if the update properties is 
invalid.
+     * </ul>
+     *
+     * @param updateProperties the update properties.
+     */
+    CompletableFuture<Void> updateProperties(UpdateProperties 
updateProperties);
 }
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
index bfc36f6fd..76f81cf78 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/utils/ClientRpcMessageUtils.java
@@ -33,6 +33,8 @@ import com.alibaba.fluss.metadata.PartitionSpec;
 import com.alibaba.fluss.metadata.PhysicalTablePath;
 import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.metadata.UpdateProperties;
+import com.alibaba.fluss.rpc.messages.AlterTableRequest;
 import com.alibaba.fluss.rpc.messages.CreatePartitionRequest;
 import com.alibaba.fluss.rpc.messages.DropPartitionRequest;
 import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenResponse;
@@ -52,6 +54,7 @@ import 
com.alibaba.fluss.rpc.messages.PbPrefixLookupReqForBucket;
 import com.alibaba.fluss.rpc.messages.PbProduceLogReqForBucket;
 import com.alibaba.fluss.rpc.messages.PbPutKvReqForBucket;
 import com.alibaba.fluss.rpc.messages.PbRemotePathAndLocalFile;
+import com.alibaba.fluss.rpc.messages.PbUpdateProperties;
 import com.alibaba.fluss.rpc.messages.PrefixLookupRequest;
 import com.alibaba.fluss.rpc.messages.ProduceLogRequest;
 import com.alibaba.fluss.rpc.messages.PutKvRequest;
@@ -344,4 +347,23 @@ public class ClientRpcMessageUtils {
                 (key, value) -> pbKeyValues.add(new 
PbKeyValue().setKey(key).setValue(value)));
         return new PbPartitionSpec().addAllPartitionKeyValues(pbKeyValues);
     }
+
+    public static AlterTableRequest makeAlterTableRequest(
+            TablePath tablePath, UpdateProperties updateProperties) {
+        AlterTableRequest request = new AlterTableRequest();
+        request.setTablePath()
+                .setDatabaseName(tablePath.getDatabaseName())
+                .setTableName(tablePath.getTableName());
+        PbUpdateProperties pbUpdateProperties = new PbUpdateProperties();
+        pbUpdateProperties.addAllSetProperties(
+                updateProperties.getSetProperties().entrySet().stream()
+                        .map(
+                                entry ->
+                                        new PbKeyValue()
+                                                .setKey(entry.getKey())
+                                                .setValue(entry.getValue()))
+                        .collect(Collectors.toList()));
+        request.setUpdateProperties(pbUpdateProperties);
+        return request;
+    }
 }
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/config/FlussConfigUtils.java 
b/fluss-common/src/main/java/com/alibaba/fluss/config/FlussConfigUtils.java
index 779a0c7f4..cca591968 100644
--- a/fluss-common/src/main/java/com/alibaba/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/com/alibaba/fluss/config/FlussConfigUtils.java
@@ -21,9 +21,13 @@ import com.alibaba.fluss.annotation.Internal;
 import com.alibaba.fluss.annotation.VisibleForTesting;
 
 import java.lang.reflect.Field;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
+import static com.alibaba.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+
 /** Utilities of Fluss {@link ConfigOptions}. */
 @Internal
 public class FlussConfigUtils {
@@ -38,6 +42,10 @@ public class FlussConfigUtils {
         CLIENT_OPTIONS = extractConfigOptions("client.");
     }
 
+    // table properties that support alter.
+    public static final List<String> TABLE_OPTIONS_SUPPORT_ALTER =
+            Collections.singletonList(TABLE_DATALAKE_ENABLED.key());
+
     @VisibleForTesting
     static Map<String, ConfigOption<?>> extractConfigOptions(String prefix) {
         Map<String, ConfigOption<?>> options = new HashMap<>();
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidAlterTableException.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidAlterTableException.java
new file mode 100644
index 000000000..6a562b8f8
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidAlterTableException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.exception;
+
+/** Exception for invalid alter table. */
+public class InvalidAlterTableException extends ApiException {
+    public InvalidAlterTableException(String message) {
+        super(message);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/metadata/UpdateProperties.java 
b/fluss-common/src/main/java/com/alibaba/fluss/metadata/UpdateProperties.java
new file mode 100644
index 000000000..05d1230d6
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/metadata/UpdateProperties.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.metadata;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Updating properties for updating properties. */
+public class UpdateProperties {
+
+    private final List<String> resetProperties;
+    private final Map<String, String> setProperties;
+
+    private UpdateProperties(List<String> resetProperties, Map<String, String> 
setProperties) {
+        this.resetProperties = resetProperties;
+        this.setProperties = setProperties;
+    }
+
+    /** Creates a builder for building update property data. */
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public List<String> getResetProperties() {
+        return resetProperties;
+    }
+
+    public Map<String, String> getSetProperties() {
+        return setProperties;
+    }
+
+    /** Builder for {@link TableDescriptor}. */
+    public static class Builder {
+        private final List<String> resetProperties;
+        private final Map<String, String> setProperties;
+
+        protected Builder() {
+            this.resetProperties = new ArrayList<>();
+            this.setProperties = new HashMap<>();
+        }
+
+        public Builder resetProperty(String property) {
+            this.resetProperties.add(property);
+            return this;
+        }
+
+        public Builder setProperty(String property, String value) {
+            this.setProperties.put(property, value);
+            return this;
+        }
+
+        public UpdateProperties build() {
+            return new UpdateProperties(resetProperties, setProperties);
+        }
+    }
+}
diff --git 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
index 7123c9120..76b208b69 100644
--- 
a/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
+++ 
b/fluss-lake/fluss-lake-paimon/src/test/java/com/alibaba/fluss/lake/paimon/LakeEnabledTableCreateITCase.java
@@ -27,6 +27,7 @@ import com.alibaba.fluss.exception.InvalidTableException;
 import com.alibaba.fluss.metadata.Schema;
 import com.alibaba.fluss.metadata.TableDescriptor;
 import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.metadata.UpdateProperties;
 import com.alibaba.fluss.server.testutils.FlussClusterExtension;
 import com.alibaba.fluss.types.DataTypes;
 
@@ -117,6 +118,37 @@ class LakeEnabledTableCreateITCase {
         return conf;
     }
 
+    @Test
+    void testDynamicEnableLake() throws Exception {
+        TableDescriptor logTable =
+                TableDescriptor.builder()
+                        .schema(Schema.newBuilder().column("log_c1", 
DataTypes.INT()).build())
+                        .build();
+        TablePath logTablePath = TablePath.of(DATABASE, "log_table");
+        admin.createTable(logTablePath, logTable, false).get();
+
+        com.alibaba.fluss.client.table.Table table = 
conn.getTable(logTablePath);
+        table.updateProperties(
+                        UpdateProperties.builder()
+                                
.setProperty(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
+                                .build())
+                .get();
+        
assertThat(conn.getTable(logTablePath).getTableInfo().getTableConfig().isDataLakeEnabled())
+                .isTrue();
+
+        Table paimonLogTable =
+                paimonCatalog.getTable(Identifier.create(DATABASE, 
logTablePath.getTableName()));
+        System.out.println(paimonLogTable);
+
+        table.updateProperties(
+                        UpdateProperties.builder()
+                                
.setProperty(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "false")
+                                .build())
+                .get();
+        
assertThat(conn.getTable(logTablePath).getTableInfo().getTableConfig().isDataLakeEnabled())
+                .isFalse();
+    }
+
     @Test
     void testCreateLakeEnabledTable() throws Exception {
         Map<String, String> customProperties = new HashMap<>();
diff --git 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java
index 7012441d1..53e1f720d 100644
--- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java
+++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java
@@ -19,6 +19,8 @@ package com.alibaba.fluss.rpc.gateway;
 
 import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
 import com.alibaba.fluss.rpc.messages.AlterConfigsResponse;
+import com.alibaba.fluss.rpc.messages.AlterTableRequest;
+import com.alibaba.fluss.rpc.messages.AlterTableResponse;
 import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
 import com.alibaba.fluss.rpc.messages.CreateAclsResponse;
 import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
@@ -109,6 +111,14 @@ public interface AdminGateway extends AdminReadOnlyGateway 
{
     @RPC(api = ApiKeys.ALTER_CONFIGS)
     CompletableFuture<AlterConfigsResponse> alterConfigs(AlterConfigsRequest 
request);
 
+    /**
+     * Alter table, like alter table properties and customProperties.
+     *
+     * @param request Alter table request
+     */
+    @RPC(api = ApiKeys.ALTER_TABLE)
+    CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest 
request);
+
     // todo: rename table & alter table
 
 }
diff --git 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java
index 1f43442c0..31481f233 100644
--- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java
@@ -72,7 +72,8 @@ public enum ApiKeys {
     DROP_ACLS(1041, 0, 0, PUBLIC),
     LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
     DESCRIBE_CONFIGS(1043, 0, 0, PUBLIC),
-    ALTER_CONFIGS(1044, 0, 0, PUBLIC);
+    ALTER_CONFIGS(1044, 0, 0, PUBLIC),
+    ALTER_TABLE(1045, 0, 0, PUBLIC);
 
     private static final Map<Integer, ApiKeys> ID_TO_TYPE =
             Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java
index 63a46a693..5fca604bf 100644
--- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java
@@ -28,6 +28,7 @@ import com.alibaba.fluss.exception.DatabaseNotExistException;
 import com.alibaba.fluss.exception.DuplicateSequenceException;
 import com.alibaba.fluss.exception.FencedLeaderEpochException;
 import com.alibaba.fluss.exception.FencedTieringEpochException;
+import com.alibaba.fluss.exception.InvalidAlterTableException;
 import com.alibaba.fluss.exception.InvalidColumnProjectionException;
 import com.alibaba.fluss.exception.InvalidConfigException;
 import com.alibaba.fluss.exception.InvalidCoordinatorException;
@@ -219,7 +220,9 @@ public enum Errors {
     INVALID_REQUEST(
             54,
             "This most likely occurs because of a request being malformed by 
the client library or the message was sent to an incompatible server. See the 
server logs for more details.",
-            InvalidRequestException::new);
+            InvalidRequestException::new),
+    INVALID_ALTER_TABLE_EXCEPTION(
+            40, "The alter table is invalid.", 
InvalidAlterTableException::new);
 
     private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
 
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 1eef39f10..df44f80cd 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -561,6 +561,19 @@ message AlterConfigsResponse{
 }
 
 
+message AlterTableRequest {
+  required PbTablePath table_path = 1;
+  optional PbUpdateProperties update_properties = 2;
+}
+
+message AlterTableResponse {
+}
+
+message PbUpdateProperties {
+  repeated string remove_properties = 1;
+  repeated PbKeyValue set_properties = 2;
+}
+
 
 // --------------- Inner classes ----------------
 message PbApiVersion {
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
index 472c11853..622b844cc 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
@@ -45,6 +45,8 @@ import com.alibaba.fluss.rpc.messages.AdjustIsrRequest;
 import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
 import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
 import com.alibaba.fluss.rpc.messages.AlterConfigsResponse;
+import com.alibaba.fluss.rpc.messages.AlterTableRequest;
+import com.alibaba.fluss.rpc.messages.AlterTableResponse;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
 import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
@@ -124,6 +126,7 @@ import static 
com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilte
 import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
 import static 
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.fromTablePath;
 import static 
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getAdjustIsrData;
+import static 
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getAlterTableData;
 import static 
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getCommitLakeTableSnapshotData;
 import static 
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getCommitRemoteLogManifestData;
 import static 
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.getPartitionSpec;
@@ -698,6 +701,14 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         return future;
     }
 
+    @Override
+    public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest 
request) {
+        AlterTableResponse response = new AlterTableResponse();
+        metadataManager.alterTable(
+                getAlterTableData(request), lakeCatalogDynamicLoader, 
lakeTableTieringManager);
+        return CompletableFuture.completedFuture(response);
+    }
+
     @VisibleForTesting
     public DataLakeFormat getDataLakeFormat() {
         return lakeCatalogDynamicLoader.getDataLakeFormat();
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java
index d6192fc01..921b839f0 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/MetadataManager.java
@@ -23,6 +23,7 @@ import 
com.alibaba.fluss.exception.DatabaseAlreadyExistException;
 import com.alibaba.fluss.exception.DatabaseNotEmptyException;
 import com.alibaba.fluss.exception.DatabaseNotExistException;
 import com.alibaba.fluss.exception.FlussRuntimeException;
+import com.alibaba.fluss.exception.InvalidAlterTableException;
 import com.alibaba.fluss.exception.InvalidPartitionException;
 import com.alibaba.fluss.exception.PartitionAlreadyExistsException;
 import com.alibaba.fluss.exception.PartitionNotExistException;
@@ -40,6 +41,8 @@ import com.alibaba.fluss.metadata.TableDescriptor;
 import com.alibaba.fluss.metadata.TableInfo;
 import com.alibaba.fluss.metadata.TablePartition;
 import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.metadata.UpdateProperties;
+import com.alibaba.fluss.server.entity.AlterTableData;
 import com.alibaba.fluss.server.utils.LakeStorageUtils;
 import com.alibaba.fluss.server.zk.ZooKeeperClient;
 import com.alibaba.fluss.server.zk.data.DatabaseRegistration;
@@ -55,13 +58,17 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Callable;
 
+import static com.alibaba.fluss.config.ConfigOptions.TABLE_DATALAKE_ENABLED;
+import static 
com.alibaba.fluss.config.FlussConfigUtils.TABLE_OPTIONS_SUPPORT_ALTER;
 import static 
com.alibaba.fluss.server.utils.TableDescriptorValidation.validateTableDescriptor;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
 
 /** A manager for metadata. */
 public class MetadataManager {
@@ -164,6 +171,77 @@ public class MetadataManager {
         return listPartitions(tablePath, null);
     }
 
+    public void alterTable(
+            AlterTableData alterTableData,
+            LakeCatalogDynamicLoader lakeCatalogDynamicLoader,
+            LakeTableTieringManager lakeTableTieringManager) {
+        TablePath tablePath = alterTableData.getTablePath();
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException("Table " + tablePath + " does not 
exist.");
+        }
+
+        TableRegistration oldTableReg = getTableRegistration(tablePath);
+
+        // validate and apply update properties.
+        TableRegistration newTableReg =
+                validateAndApplyUpdateProperties(
+                        oldTableReg, alterTableData.getUpdatePropertiesData());
+
+        Configuration newConfig =
+                
Configuration.fromMap(alterTableData.getUpdatePropertiesData().getSetProperties());
+
+        boolean isEnabledDataLake = false;
+        if (newConfig.get(TABLE_DATALAKE_ENABLED)
+                && 
!Configuration.fromMap(oldTableReg.properties).get(TABLE_DATALAKE_ENABLED)) {
+            try {
+                TableDescriptor newTableDescriptor =
+                        getTable(tablePath)
+                                .toTableDescriptor()
+                                .withProperties(newTableReg.properties);
+                checkNotNull(lakeCatalogDynamicLoader.getLakeCatalog())
+                        .createTable(tablePath, newTableDescriptor);
+                isEnabledDataLake = true;
+            } catch (TableAlreadyExistException e) {
+                throw new TableAlreadyExistException(
+                        String.format(
+                                "The table %s already exists in %s catalog, 
please "
+                                        + "first drop the table in %s catalog 
or use a new table name.",
+                                tablePath,
+                                lakeCatalogDynamicLoader.getDataLakeFormat(),
+                                lakeCatalogDynamicLoader.getDataLakeFormat()));
+            }
+        } else if (!newConfig.get(TABLE_DATALAKE_ENABLED)) {
+            lakeTableTieringManager.removeLakeTable(oldTableReg.tableId);
+        }
+
+        uncheck(
+                () -> zookeeperClient.updateTable(tablePath, newTableReg),
+                "Fail to alter table: " + tablePath);
+
+        if (isEnabledDataLake) {
+            lakeTableTieringManager.addNewLakeTable(getTable(tablePath));
+        }
+    }
+
+    private TableRegistration validateAndApplyUpdateProperties(
+            TableRegistration oldTableReg, UpdateProperties 
updatePropertiesData) {
+        Map<String, String> properties = new HashMap<>(oldTableReg.properties);
+        // 1. set table properties.
+        for (Map.Entry<String, String> setProperty :
+                updatePropertiesData.getSetProperties().entrySet()) {
+            validateSetTableProperty(setProperty.getKey());
+            properties.put(setProperty.getKey(), setProperty.getValue());
+        }
+        return oldTableReg.copy(properties);
+    }
+
+    private void validateSetTableProperty(String setKey) {
+        if (!TABLE_OPTIONS_SUPPORT_ALTER.contains(setKey)) {
+            throw new InvalidAlterTableException(
+                    "Update table property: '" + setKey + "' is not supported 
yet.");
+        }
+    }
+
     /**
      * List the partitions of the given table and partitionSpec.
      *
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/entity/AlterTableData.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/entity/AlterTableData.java
new file mode 100644
index 000000000..932891ba0
--- /dev/null
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/entity/AlterTableData.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.entity;
+
+import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.metadata.UpdateProperties;
+import com.alibaba.fluss.rpc.messages.AlterTableRequest;
+
+/** The data for request {@link AlterTableRequest}. */
+public class AlterTableData {
+    private final TablePath tablePath;
+    private final UpdateProperties updateProperties;
+    // TODO add more alter table type like add column, change schema etc.
+    public AlterTableData(TablePath tablePath, UpdateProperties 
updateProperties) {
+        this.tablePath = tablePath;
+        this.updateProperties = updateProperties;
+    }
+
+    public TablePath getTablePath() {
+        return tablePath;
+    }
+
+    public UpdateProperties getUpdatePropertiesData() {
+        return updateProperties;
+    }
+}
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
index 034163493..fc1ce7345 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/utils/ServerRpcMessageUtils.java
@@ -30,6 +30,7 @@ import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.metadata.TableDescriptor;
 import com.alibaba.fluss.metadata.TableInfo;
 import com.alibaba.fluss.metadata.TablePath;
+import com.alibaba.fluss.metadata.UpdateProperties;
 import com.alibaba.fluss.record.BytesViewLogRecords;
 import com.alibaba.fluss.record.DefaultKvRecordBatch;
 import com.alibaba.fluss.record.DefaultValueRecordBatch;
@@ -49,6 +50,7 @@ import com.alibaba.fluss.rpc.entity.ProduceLogResultForBucket;
 import com.alibaba.fluss.rpc.entity.PutKvResultForBucket;
 import com.alibaba.fluss.rpc.messages.AdjustIsrRequest;
 import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
+import com.alibaba.fluss.rpc.messages.AlterTableRequest;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
 import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
 import com.alibaba.fluss.rpc.messages.CommitRemoteLogManifestRequest;
@@ -116,6 +118,7 @@ import 
com.alibaba.fluss.rpc.messages.PbStopReplicaRespForBucket;
 import com.alibaba.fluss.rpc.messages.PbTableBucket;
 import com.alibaba.fluss.rpc.messages.PbTableMetadata;
 import com.alibaba.fluss.rpc.messages.PbTablePath;
+import com.alibaba.fluss.rpc.messages.PbUpdateProperties;
 import com.alibaba.fluss.rpc.messages.PbValue;
 import com.alibaba.fluss.rpc.messages.PbValueList;
 import com.alibaba.fluss.rpc.messages.PrefixLookupRequest;
@@ -133,6 +136,7 @@ import com.alibaba.fluss.security.acl.AclBinding;
 import com.alibaba.fluss.server.authorizer.AclCreateResult;
 import com.alibaba.fluss.server.authorizer.AclDeleteResult;
 import com.alibaba.fluss.server.entity.AdjustIsrResultForBucket;
+import com.alibaba.fluss.server.entity.AlterTableData;
 import com.alibaba.fluss.server.entity.CommitLakeTableSnapshotData;
 import com.alibaba.fluss.server.entity.CommitRemoteLogManifestData;
 import com.alibaba.fluss.server.entity.FetchReqInfo;
@@ -1626,4 +1630,17 @@ public class ServerRpcMessageUtils {
         result.addAll(errors);
         return result;
     }
+
+    public static AlterTableData getAlterTableData(AlterTableRequest request) {
+        PbTablePath pbTablePath = request.getTablePath();
+        TablePath tablePath =
+                TablePath.of(pbTablePath.getDatabaseName(), 
pbTablePath.getTableName());
+        PbUpdateProperties pbUpdateProperties = request.getUpdateProperties();
+        UpdateProperties.Builder builder = UpdateProperties.builder();
+        for (PbKeyValue setProperty : 
pbUpdateProperties.getSetPropertiesList()) {
+            builder.setProperty(setProperty.getKey(), setProperty.getValue());
+        }
+
+        return new AlterTableData(tablePath, builder.build());
+    }
 }
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java
index 175a56c93..d66a75501 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/TableRegistration.java
@@ -132,6 +132,18 @@ public class TableRegistration {
                 currentMillis);
     }
 
+    public TableRegistration copy(Map<String, String> newProperties) {
+        return new TableRegistration(
+                tableId,
+                comment,
+                partitionKeys,
+                new TableDistribution(bucketCount, bucketKeys),
+                newProperties,
+                customProperties,
+                createdTime,
+                System.currentTimeMillis());
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
index 423d89efc..1afc23782 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -24,6 +24,8 @@ import com.alibaba.fluss.rpc.messages.AdjustIsrRequest;
 import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
 import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
 import com.alibaba.fluss.rpc.messages.AlterConfigsResponse;
+import com.alibaba.fluss.rpc.messages.AlterTableRequest;
+import com.alibaba.fluss.rpc.messages.AlterTableResponse;
 import com.alibaba.fluss.rpc.messages.ApiVersionsRequest;
 import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
@@ -320,6 +322,11 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<AlterTableResponse> alterTable(AlterTableRequest 
request) {
+        throw new UnsupportedOperationException();
+    }
+
     @Override
     public CompletableFuture<DescribeConfigsResponse> describeConfigs(
             DescribeConfigsRequest request) {


Reply via email to