This is an automated email from the ASF dual-hosted git repository.

hongshun pushed a commit to branch dlf-2.5
in repository https://gitbox.apache.org/repos/asf/fluss.git

commit f82d10324538b0e217b32f5e8c55b2f28200bd62
Author: Hongshun Wang <[email protected]>
AuthorDate: Tue Aug 19 20:49:12 2025 +0800

    [FIP-12] add dynamic config to enable lakehouse.
---
 .../java/com/alibaba/fluss/client/admin/Admin.java |  12 ++
 .../com/alibaba/fluss/client/admin/FlussAdmin.java |  66 ++++++
 .../fluss/client/write/RecordAccumulator.java      |   1 +
 .../alibaba/fluss/client/write/WriterClient.java   |   1 +
 .../fluss/client/admin/FlussAdminITCase.java       |  61 +++++-
 .../security/acl/FlussAuthorizationITCase.java     |  67 ++++++
 .../fluss/config/dynamic/AlterConfigOp.java        | 129 ++++++++++++
 .../alibaba/fluss/config/dynamic/ConfigEntry.java  |  99 +++++++++
 .../fluss/config/dynamic/ServerReconfigurable.java |  98 +++++++++
 .../alibaba/fluss/exception/ConfigException.java   |  28 +++
 .../fluss/exception/InvalidRequestException.java   |  32 +++
 .../alibaba/fluss/security/acl/OperationType.java  |   5 +-
 .../fluss/security/auth/ServerAuthenticator.java   |   3 +-
 .../authenticator/SaslServerAuthenticator.java     |   1 +
 fluss-filesystems/fluss-fs-oss/pom.xml             |   8 +
 .../alibaba/fluss/flink/source/FlinkSource.java    |   4 +-
 .../alibaba/fluss/rpc/gateway/AdminGateway.java    |   5 +
 .../fluss/rpc/gateway/AdminReadOnlyGateway.java    |   5 +
 .../fluss/rpc/netty/server/NettyServerHandler.java |   4 +-
 .../com/alibaba/fluss/rpc/protocol/ApiKeys.java    |   4 +-
 .../com/alibaba/fluss/rpc/protocol/Errors.java     |   7 +-
 fluss-rpc/src/main/proto/FlussApi.proto            |  34 ++-
 .../fluss/rpc/TestingTabletGatewayService.java     |   8 +
 .../fluss/rpc/netty/client/NettyClientTest.java    |   2 +-
 .../com/alibaba/fluss/server/ConfigHandler.java    |  25 +++
 .../alibaba/fluss/server/DynamicConfigManager.java | 225 ++++++++++++++++++++
 .../alibaba/fluss/server/DynamicServerConfig.java  | 175 ++++++++++++++++
 .../com/alibaba/fluss/server/RpcServiceBase.java   |  38 +++-
 .../java/com/alibaba/fluss/server/ServerBase.java  |   2 +
 .../server/coordinator/CoordinatorServer.java      |  49 +++--
 .../server/coordinator/CoordinatorService.java     |  88 ++++++--
 .../coordinator/LakeCatalogDynamicLoader.java      | 143 +++++++++++++
 .../alibaba/fluss/server/tablet/TabletServer.java  |  17 +-
 .../alibaba/fluss/server/tablet/TabletService.java |  12 +-
 .../alibaba/fluss/server/zk/ZooKeeperClient.java   |  32 +++
 .../ConfigEntityChangeNotificationJsonSerde.java   |  50 +++++
 .../fluss/server/zk/data/ConfigJsonSerde.java      |  71 +++++++
 .../com/alibaba/fluss/server/zk/data/ZkData.java   |  69 +++++++
 .../fluss/server/DynamicConfigChangeTest.java      | 230 +++++++++++++++++++++
 .../server/coordinator/TestCoordinatorGateway.java |  15 ++
 .../server/tablet/TestTabletServerGateway.java     |   8 +
 .../fluss/server/zk/data/ConfigJsonSerdeTest.java  |  51 +++++
 .../src/test/resources/log4j2-test.properties      |   2 +-
 43 files changed, 1922 insertions(+), 64 deletions(-)

diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java 
b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java
index 89bb7b455..a12fedf5e 100644
--- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java
+++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/Admin.java
@@ -23,6 +23,8 @@ import com.alibaba.fluss.client.metadata.KvSnapshots;
 import com.alibaba.fluss.client.metadata.LakeSnapshot;
 import com.alibaba.fluss.cluster.ServerNode;
 import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.dynamic.AlterConfigOp;
+import com.alibaba.fluss.config.dynamic.ConfigEntry;
 import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
 import com.alibaba.fluss.exception.DatabaseNotEmptyException;
 import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -450,4 +452,14 @@ public interface Admin extends AutoCloseable {
      * @return A CompletableFuture indicating completion of the operation.
      */
     DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
+
+    /** Describe the configs of the cluster. */
+    CompletableFuture<Collection<ConfigEntry>> describeConfigs();
+
+    /**
+     * Alter the configs of the cluster.
+     *
+     * @return
+     */
+    CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs);
 }
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java 
b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
index e50b5c704..ce091aece 100644
--- a/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
+++ b/fluss-client/src/main/java/com/alibaba/fluss/client/admin/FlussAdmin.java
@@ -24,6 +24,8 @@ import com.alibaba.fluss.client.metadata.MetadataUpdater;
 import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
 import com.alibaba.fluss.cluster.Cluster;
 import com.alibaba.fluss.cluster.ServerNode;
+import com.alibaba.fluss.config.dynamic.AlterConfigOp;
+import com.alibaba.fluss.config.dynamic.ConfigEntry;
 import com.alibaba.fluss.metadata.DatabaseDescriptor;
 import com.alibaba.fluss.metadata.DatabaseInfo;
 import com.alibaba.fluss.metadata.PartitionInfo;
@@ -40,11 +42,13 @@ import com.alibaba.fluss.rpc.RpcClient;
 import com.alibaba.fluss.rpc.gateway.AdminGateway;
 import com.alibaba.fluss.rpc.gateway.AdminReadOnlyGateway;
 import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
+import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
 import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
 import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
 import com.alibaba.fluss.rpc.messages.CreateTableRequest;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsRequest;
 import com.alibaba.fluss.rpc.messages.DropAclsRequest;
 import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
 import com.alibaba.fluss.rpc.messages.DropTableRequest;
@@ -61,6 +65,8 @@ import com.alibaba.fluss.rpc.messages.ListOffsetsRequest;
 import com.alibaba.fluss.rpc.messages.ListPartitionInfosRequest;
 import com.alibaba.fluss.rpc.messages.ListTablesRequest;
 import com.alibaba.fluss.rpc.messages.ListTablesResponse;
+import com.alibaba.fluss.rpc.messages.PbAlterConfigsRequestInfo;
+import com.alibaba.fluss.rpc.messages.PbDescribeConfigsResponseInfo;
 import com.alibaba.fluss.rpc.messages.PbListOffsetsRespForBucket;
 import com.alibaba.fluss.rpc.messages.PbPartitionSpec;
 import com.alibaba.fluss.rpc.messages.PbTablePath;
@@ -80,6 +86,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
 
 import static 
com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest;
 import static 
com.alibaba.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest;
@@ -465,6 +472,65 @@ public class FlussAdmin implements Admin {
         return result;
     }
 
+    @Override
+    public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
+        CompletableFuture<Collection<ConfigEntry>> future = new 
CompletableFuture<>();
+        DescribeConfigsRequest request = new DescribeConfigsRequest();
+        gateway.describeConfigs(request)
+                .whenComplete(
+                        (r, t) -> {
+                            if (t != null) {
+                                future.completeExceptionally(t);
+                            }
+
+                            List<PbDescribeConfigsResponseInfo> responseInfos 
= r.getInfosList();
+                            List<ConfigEntry> configEntries =
+                                    responseInfos.stream()
+                                            .map(
+                                                    responseInfo ->
+                                                            new ConfigEntry(
+                                                                    
responseInfo.getConfigKey(),
+                                                                    
responseInfo.hasConfigValue()
+                                                                            ? 
responseInfo
+                                                                               
     .getConfigValue()
+                                                                            : 
null,
+                                                                    
ConfigEntry.ConfigSource
+                                                                            
.valueOf(
+                                                                               
     responseInfo
+                                                                               
             .getConfigSource())))
+                                            .collect(Collectors.toList());
+                            future.complete(configEntries);
+                        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> 
configs) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        AlterConfigsRequest request = new AlterConfigsRequest();
+        for (AlterConfigOp alterConfigOp : configs) {
+            PbAlterConfigsRequestInfo requestInfo =
+                    request.addInfo()
+                            .setConfigKey(alterConfigOp.key())
+                            .setOpType(alterConfigOp.opType().id());
+            if (alterConfigOp.value() != null) {
+                requestInfo.setConfigValue(alterConfigOp.value());
+            }
+        }
+        gateway.alterConfigs(request)
+                .whenComplete(
+                        (r, t) -> {
+                            if (t != null) {
+                                future.completeExceptionally(t);
+                            }
+
+                            future.complete(null);
+                        });
+
+        return future;
+    }
+
     @Override
     public void close() {
         // nothing to do yet
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java
 
b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java
index 3fee121b9..bfbf3820c 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/write/RecordAccumulator.java
@@ -924,6 +924,7 @@ public final class RecordAccumulator {
 
         writerBufferPool.close();
         arrowWriterPool.close();
+        // todo: close的时候会检测,
         bufferAllocator.close();
     }
 
diff --git 
a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java 
b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java
index b473beba2..ee69ee49b 100644
--- 
a/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java
+++ 
b/fluss-client/src/main/java/com/alibaba/fluss/client/write/WriterClient.java
@@ -288,6 +288,7 @@ public class WriterClient {
         writerMetricGroup.close();
 
         if (sender != null) {
+            // todo
             sender.initiateClose();
         }
 
diff --git 
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
 
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
index 0c29e3b26..6f939b049 100644
--- 
a/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
+++ 
b/fluss-client/src/test/java/com/alibaba/fluss/client/admin/FlussAdminITCase.java
@@ -27,6 +27,8 @@ import com.alibaba.fluss.cluster.ServerNode;
 import com.alibaba.fluss.config.AutoPartitionTimeUnit;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.config.dynamic.AlterConfigOp;
+import com.alibaba.fluss.config.dynamic.ConfigEntry;
 import com.alibaba.fluss.exception.DatabaseAlreadyExistException;
 import com.alibaba.fluss.exception.DatabaseNotEmptyException;
 import com.alibaba.fluss.exception.DatabaseNotExistException;
@@ -45,7 +47,6 @@ import com.alibaba.fluss.exception.TooManyBucketsException;
 import com.alibaba.fluss.exception.TooManyPartitionsException;
 import com.alibaba.fluss.fs.FsPath;
 import com.alibaba.fluss.fs.FsPathAndFileName;
-import com.alibaba.fluss.metadata.DataLakeFormat;
 import com.alibaba.fluss.metadata.DatabaseDescriptor;
 import com.alibaba.fluss.metadata.DatabaseInfo;
 import com.alibaba.fluss.metadata.KvFormat;
@@ -69,6 +70,7 @@ import java.time.Duration;
 import java.time.LocalDate;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -78,6 +80,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
+import static com.alibaba.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static com.alibaba.fluss.metadata.DataLakeFormat.PAIMON;
 import static com.alibaba.fluss.testutils.DataTestUtils.row;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -165,7 +169,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                 .isEqualTo(
                         DEFAULT_TABLE_DESCRIPTOR
                                 .withReplicationFactor(3)
-                                .withDataLakeFormat(DataLakeFormat.PAIMON));
+                                .withDataLakeFormat(PAIMON));
         assertThat(schemaInfo2).isEqualTo(schemaInfo);
         
assertThat(tableInfo.getCreatedTime()).isEqualTo(tableInfo.getModifiedTime());
         
assertThat(tableInfo.getCreatedTime()).isLessThan(timestampAfterCreate);
@@ -190,7 +194,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                 .isEqualTo(
                         DEFAULT_TABLE_DESCRIPTOR
                                 .withReplicationFactor(3)
-                                .withDataLakeFormat(DataLakeFormat.PAIMON));
+                                .withDataLakeFormat(PAIMON));
         assertThat(schemaInfo2).isEqualTo(schemaInfo);
         // assert created time
         assertThat(tableInfo.getCreatedTime())
@@ -390,7 +394,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                     .isEqualTo(
                             DEFAULT_TABLE_DESCRIPTOR
                                     .withReplicationFactor(3)
-                                    
.withDataLakeFormat(DataLakeFormat.PAIMON));
+                                    .withDataLakeFormat(PAIMON));
         }
     }
 
@@ -886,6 +890,55 @@ class FlussAdminITCase extends ClientToServerITCaseBase {
                 .isInstanceOf(TooManyPartitionsException.class);
     }
 
+    @Test
+    void testDynamicConfigs() throws ExecutionException, InterruptedException {
+        assertThat(
+                        FLUSS_CLUSTER_EXTENSION
+                                .getCoordinatorServer()
+                                .getCoordinatorService()
+                                .getDataLakeFormat())
+                .isEqualTo(PAIMON);
+
+        admin.alterConfigs(
+                        Collections.singletonList(
+                                new AlterConfigOp(
+                                        DATALAKE_FORMAT.key(), null, 
AlterConfigOp.OpType.SET)))
+                .get();
+        assertThat(
+                        FLUSS_CLUSTER_EXTENSION
+                                .getCoordinatorServer()
+                                .getCoordinatorService()
+                                .getDataLakeFormat())
+                .isNull();
+        Collection<ConfigEntry> configToResourceConfigs = 
admin.describeConfigs().get();
+        assertThat(configToResourceConfigs)
+                .contains(
+                        new ConfigEntry(
+                                DATALAKE_FORMAT.key(),
+                                null,
+                                
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG));
+
+        // Delete dynamic configs to use the initial value(from server.yaml)
+        admin.alterConfigs(
+                        Collections.singletonList(
+                                new AlterConfigOp(
+                                        DATALAKE_FORMAT.key(), null, 
AlterConfigOp.OpType.DELETE)))
+                .get();
+        assertThat(
+                        FLUSS_CLUSTER_EXTENSION
+                                .getCoordinatorServer()
+                                .getCoordinatorService()
+                                .getDataLakeFormat())
+                .isEqualTo(PAIMON);
+        configToResourceConfigs = admin.describeConfigs().get();
+        assertThat(configToResourceConfigs)
+                .contains(
+                        new ConfigEntry(
+                                DATALAKE_FORMAT.key(),
+                                "paimon",
+                                
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG));
+    }
+
     private void assertNoBucketSnapshot(KvSnapshots snapshots, int 
expectBucketNum) {
         assertThat(snapshots.getBucketIds()).hasSize(expectBucketNum);
         for (int i = 0; i < expectBucketNum; i++) {
diff --git 
a/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
 
b/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
index 7ba60d702..08b381565 100644
--- 
a/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
+++ 
b/fluss-client/src/test/java/com/alibaba/fluss/client/security/acl/FlussAuthorizationITCase.java
@@ -28,6 +28,8 @@ import com.alibaba.fluss.client.utils.ClientRpcMessageUtils;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.config.MemorySize;
+import com.alibaba.fluss.config.dynamic.AlterConfigOp;
+import com.alibaba.fluss.config.dynamic.ConfigEntry;
 import com.alibaba.fluss.exception.AuthorizationException;
 import com.alibaba.fluss.metadata.DataLakeFormat;
 import com.alibaba.fluss.metadata.DatabaseDescriptor;
@@ -69,6 +71,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutionException;
 
+import static com.alibaba.fluss.config.ConfigOptions.DATALAKE_FORMAT;
 import static com.alibaba.fluss.record.TestData.DATA1_SCHEMA;
 import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR;
 import static com.alibaba.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK;
@@ -653,6 +656,70 @@ public class FlussAuthorizationITCase {
         }
     }
 
+    @Test
+    void testDynamicConfigs() throws ExecutionException, InterruptedException {
+        assertThatThrownBy(
+                        () ->
+                                guestAdmin
+                                        .alterConfigs(
+                                                Collections.singletonList(
+                                                        new AlterConfigOp(
+                                                                
DATALAKE_FORMAT.key(),
+                                                                null,
+                                                                
AlterConfigOp.OpType.SET)))
+                                        .get())
+                .rootCause()
+                .hasMessageContaining(
+                        String.format(
+                                "Principal %s have no authorization to operate 
ALTER_CONFIGS on resource Resource{type=CLUSTER, name='fluss-cluster'}",
+                                guestPrincipal));
+
+        rootAdmin
+                .createAcls(
+                        Collections.singletonList(
+                                new AclBinding(
+                                        Resource.cluster(),
+                                        new AccessControlEntry(
+                                                guestPrincipal,
+                                                "*",
+                                                OperationType.ALTER_CONFIGS,
+                                                PermissionType.ALLOW))))
+                .all()
+                .get();
+        guestAdmin
+                .alterConfigs(
+                        Collections.singletonList(
+                                new AlterConfigOp(
+                                        DATALAKE_FORMAT.key(), null, 
AlterConfigOp.OpType.SET)))
+                .get();
+
+        assertThatThrownBy(() -> guestAdmin.describeConfigs().get())
+                .rootCause()
+                .hasMessageContaining(
+                        String.format(
+                                "Principal %s have no authorization to operate 
DESCRIBE_CONFIGS on resource Resource{type=CLUSTER, name='fluss-cluster'}",
+                                guestPrincipal));
+        rootAdmin
+                .createAcls(
+                        Collections.singletonList(
+                                new AclBinding(
+                                        Resource.cluster(),
+                                        new AccessControlEntry(
+                                                guestPrincipal,
+                                                "*",
+                                                OperationType.DESCRIBE_CONFIGS,
+                                                PermissionType.ALLOW))))
+                .all()
+                .get();
+        Collection<ConfigEntry> configToResourceConfigs = 
guestAdmin.describeConfigs().get();
+        assertThat(configToResourceConfigs)
+                .contains(
+                        new ConfigEntry(
+                                DATALAKE_FORMAT.key(),
+                                null,
+                                
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG));
+    }
+
     private static Configuration initConfig() {
         Configuration conf = new Configuration();
         conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3);
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/AlterConfigOp.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/AlterConfigOp.java
new file mode 100644
index 000000000..caa7d20d7
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/AlterConfigOp.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.config.dynamic;
+
+import javax.annotation.Nullable;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/** Configuration change operation. */
+public class AlterConfigOp {
+
+    /** The type of configuration change operation. */
+    public enum OpType {
+        /** Set the value of the configuration entry. */
+        SET((byte) 0),
+        /** Revert the configuration entry to the default value (possibly 
null). */
+        DELETE((byte) 1),
+        /**
+         * (For list-type configuration entries only.) Add the specified 
values to the current value
+         * of the configuration entry. If the configuration value has not been 
set, adds to the
+         * default value.
+         */
+        APPEND((byte) 2),
+        /**
+         * (For list-type configuration entries only.) Removes the specified 
values from the current
+         * value of the configuration entry. It is legal to remove values that 
are not currently in
+         * the configuration entry. Removing all entries from the current 
configuration value leaves
+         * an empty list and does NOT revert to the default value of the entry.
+         */
+        SUBTRACT((byte) 3),
+
+        UNKNOWN((byte) -1);
+
+        private static final Map<Byte, OpType> OP_TYPES =
+                Collections.unmodifiableMap(
+                        Arrays.stream(values())
+                                .collect(Collectors.toMap(OpType::id, 
Function.identity())));
+
+        private final byte id;
+
+        OpType(final byte id) {
+            this.id = id;
+        }
+
+        public byte id() {
+            return id;
+        }
+
+        public static OpType forId(final byte id) {
+            return OP_TYPES.getOrDefault(id, UNKNOWN);
+        }
+    }
+
+    private final String key;
+    @Nullable private final String value;
+    private final OpType opType;
+
+    public AlterConfigOp(String key, @Nullable String value, OpType 
operationType) {
+        this.key = key;
+        this.value = value;
+        this.opType = operationType;
+    }
+
+    public String key() {
+        return key;
+    }
+
+    @Nullable
+    public String value() {
+        return value;
+    }
+
+    public OpType opType() {
+        return opType;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        final AlterConfigOp that = (AlterConfigOp) o;
+        return opType == that.opType
+                && Objects.equals(key, that.key)
+                && Objects.equals(value, that.value);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(opType, key, value);
+    }
+
+    @Override
+    public String toString() {
+        return "AlterConfigOp{"
+                + "name='"
+                + key
+                + '\''
+                + ", value='"
+                + value
+                + '\''
+                + ", opType="
+                + opType
+                + '}';
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/ConfigEntry.java 
b/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/ConfigEntry.java
new file mode 100644
index 000000000..c5af7ec29
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/ConfigEntry.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.config.dynamic;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Configuration entry. */
+public class ConfigEntry {
+    private final String key;
+    @Nullable private final String value;
+    private final ConfigSource source;
+
+    /**
+     * Create a configuration with the provided values.
+     *
+     * @param key the non-null config name
+     * @param value the config value or null
+     * @param source the source of this config entr
+     */
+    public ConfigEntry(String key, @Nullable String value, ConfigSource 
source) {
+        Objects.requireNonNull(key, "name should not be null");
+        this.key = key;
+        this.value = value;
+        this.source = source;
+    }
+
+    /** Return the config key. */
+    public String key() {
+        return key;
+    }
+
+    /**
+     * Return the value or null. Null is returned if the config is unset or if 
isSensitive is true.
+     */
+    public @Nullable String value() {
+        return value;
+    }
+
+    /** Return the source of this configuration entry. */
+    public ConfigSource source() {
+        return source;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        ConfigEntry that = (ConfigEntry) o;
+
+        return this.key.equals(that.key) && this.value != null
+                ? this.value.equals(that.value)
+                : that.value == null && this.source == that.source;
+    }
+
+    @Override
+    public int hashCode() {
+        final int prime = 31;
+        int result = 1;
+        result = prime * result + key.hashCode();
+        result = prime * result + ((value == null) ? 0 : value.hashCode());
+        result = prime * result + source.hashCode();
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "ConfigEntry(" + "name=" + key + ", value=" + value + ", 
source=" + source + ")";
+    }
+
+    /** Source of configuration entries. */
+    public enum ConfigSource {
+        DYNAMIC_SERVER_CONFIG, // dynamic server config that is configured for 
all servers in the
+        // cluster
+        INITIAL_SERVER_CONFIG, // initial server config provided as server 
properties at start
+        // up(e.g. server.yaml file)
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/ServerReconfigurable.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/ServerReconfigurable.java
new file mode 100644
index 000000000..ea26cd265
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/config/dynamic/ServerReconfigurable.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.config.dynamic;
+
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.exception.ConfigException;
+
+import java.util.Set;
+
+/** Server Reconfigurable Interface which can dynamically respond to 
configuration changes. */
+public interface ServerReconfigurable {
+
+    /**
+     * Validates the provided configuration. The provided map contains all 
configs including any
+     * reconfigurable configs that may be different from the initial 
configuration. Reconfiguration
+     * will be not performed if this method throws any exception.
+     *
+     * <p>This method should check that the new configuration values are valid 
and will not cause
+     * issues when applied. It should throw a ConfigException with a 
descriptive message if any
+     * validation fails.
+     *
+     * <p>This validation step is crucial as it prevents applying invalid 
configurations that could
+     * potentially cause the component to malfunction or behave unexpectedly.
+     *
+     * @param newConfig the new configuration that would be applied if 
validation succeeds. This
+     *     contains all configuration properties, not just the ones that 
changed or are
+     *     reconfigurable.
+     * @throws ConfigException if the configuration is invalid or cannot be 
applied to this
+     *     component
+     */
+    void validate(Configuration newConfig) throws ConfigException;
+
+    /**
+     * Reconfigures the component with the provided configuration.
+     *
+     * <p>This method is called after validation succeeds and should apply the 
new configuration to
+     * the component. The implementation should update internal state and make 
necessary adjustments
+     * to adapt to the new configuration values.
+     *
+     * <p>This method should be designed to be thread-safe as it may be called 
concurrently with
+     * other operations on the component. It should also be prepared to handle 
being called multiple
+     * times with different configurations.
+     *
+     * @param newConfig the validated configuration to apply to this component
+     * @throws ConfigException if there is an error applying the 
configuration, though this should
+     *     generally be avoided as the validation step should catch most issues
+     */
+    void reconfigure(Configuration newConfig) throws ConfigException;
+
+    /** Allowed configuration keys. */
+    class AllowedConfigs {
+        /** Exact configuration keys that can be reconfigured. */
+        private final Set<String> exactConfigKeys;
+
+        /**
+         * Configuration key prefixes. Keys starting with any of these 
prefixes can be reconfigured.
+         */
+        private final Set<String> configKeyPrefixes;
+
+        private final Set<String> encryptedKeys;
+
+        public AllowedConfigs(
+                Set<String> exactConfigKeys,
+                Set<String> configKeyPrefixes,
+                Set<String> encryptedKeys) {
+            this.exactConfigKeys = exactConfigKeys;
+            this.configKeyPrefixes = configKeyPrefixes;
+            this.encryptedKeys = encryptedKeys;
+        }
+
+        public Set<String> exactConfigKeys() {
+            return exactConfigKeys;
+        }
+
+        public Set<String> configKeyPrefixes() {
+            return configKeyPrefixes;
+        }
+
+        public Set<String> encryptedKeys() {
+            return encryptedKeys;
+        }
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/exception/ConfigException.java 
b/fluss-common/src/main/java/com/alibaba/fluss/exception/ConfigException.java
new file mode 100644
index 000000000..fba77480c
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/exception/ConfigException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.exception;
+
+/** An exception that indicates a configuration error. */
+public class ConfigException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public ConfigException(String message) {
+        super(message);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidRequestException.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidRequestException.java
new file mode 100644
index 000000000..742808120
--- /dev/null
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/exception/InvalidRequestException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.exception;
+
+/** Exception thrown when a request is invalid. */
+public class InvalidRequestException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public InvalidRequestException(String message) {
+        super(message);
+    }
+
+    public InvalidRequestException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/security/acl/OperationType.java 
b/fluss-common/src/main/java/com/alibaba/fluss/security/acl/OperationType.java
index 409372302..2b17c8411 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/security/acl/OperationType.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/security/acl/OperationType.java
@@ -41,7 +41,10 @@ public enum OperationType {
     CREATE((byte) 5),
     DROP((byte) 6),
     ALTER((byte) 7),
-    DESCRIBE((byte) 8);
+    DESCRIBE((byte) 8),
+    DESCRIBE_CONFIGS((byte) 9),
+    ALTER_CONFIGS((byte) 10),
+    UNKNOWN((byte) -1);
 
     private final byte code;
 
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java
index 13f739bcb..d22029c75 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/ServerAuthenticator.java
@@ -21,7 +21,6 @@ import com.alibaba.fluss.annotation.PublicEvolving;
 import com.alibaba.fluss.exception.AuthenticationException;
 import com.alibaba.fluss.security.acl.FlussPrincipal;
 
-import java.io.Closeable;
 import java.io.IOException;
 
 /**
@@ -30,7 +29,7 @@ import java.io.IOException;
  * @since 0.7
  */
 @PublicEvolving
-public interface ServerAuthenticator extends Closeable {
+public interface ServerAuthenticator {
 
     String protocol();
 
diff --git 
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java
 
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java
index 1d5cf595b..01610c662 100644
--- 
a/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java
+++ 
b/fluss-common/src/main/java/com/alibaba/fluss/security/auth/sasl/authenticator/SaslServerAuthenticator.java
@@ -49,6 +49,7 @@ public class SaslServerAuthenticator implements 
ServerAuthenticator {
     private final Map<String, String> configs;
 
     public SaslServerAuthenticator(Configuration configuration) {
+        // todo: 更新之后会更新这些值 + loginManager.closeAll()
         this.configs = configuration.toMap();
         List<String> enabledMechanisms = 
configuration.get(SERVER_SASL_ENABLED_MECHANISMS_CONFIG);
         if (enabledMechanisms == null || enabledMechanisms.isEmpty()) {
diff --git a/fluss-filesystems/fluss-fs-oss/pom.xml 
b/fluss-filesystems/fluss-fs-oss/pom.xml
index 4c52000e9..3d10927ef 100644
--- a/fluss-filesystems/fluss-fs-oss/pom.xml
+++ b/fluss-filesystems/fluss-fs-oss/pom.xml
@@ -119,6 +119,14 @@
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.fluss</groupId>
+            <artifactId>fluss-server</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
index d806164e0..c4536203b 100644
--- 
a/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
+++ 
b/fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java
@@ -51,6 +51,7 @@ import javax.annotation.Nullable;
 import java.util.List;
 
 import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+import static 
org.apache.flink.connector.base.source.reader.SourceReaderOptions.ELEMENT_QUEUE_CAPACITY;
 
 /** Flink source for Fluss. */
 public class FlinkSource<OUT>
@@ -147,7 +148,8 @@ public class FlinkSource<OUT>
     public SourceReader<OUT, SourceSplitBase> createReader(SourceReaderContext 
context)
             throws Exception {
         FutureCompletingBlockingQueue<RecordsWithSplitIds<RecordAndPos>> 
elementsQueue =
-                new FutureCompletingBlockingQueue<>();
+                new FutureCompletingBlockingQueue<>(
+                        
context.getConfiguration().get(ELEMENT_QUEUE_CAPACITY));
         FlinkSourceReaderMetrics flinkSourceReaderMetrics =
                 new FlinkSourceReaderMetrics(context.metricGroup());
 
diff --git 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java
index 048e5c253..7012441d1 100644
--- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java
+++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminGateway.java
@@ -17,6 +17,8 @@
 
 package com.alibaba.fluss.rpc.gateway;
 
+import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
+import com.alibaba.fluss.rpc.messages.AlterConfigsResponse;
 import com.alibaba.fluss.rpc.messages.CreateAclsRequest;
 import com.alibaba.fluss.rpc.messages.CreateAclsResponse;
 import com.alibaba.fluss.rpc.messages.CreateDatabaseRequest;
@@ -104,6 +106,9 @@ public interface AdminGateway extends AdminReadOnlyGateway {
     @RPC(api = ApiKeys.DROP_ACLS)
     CompletableFuture<DropAclsResponse> dropAcls(DropAclsRequest request);
 
+    @RPC(api = ApiKeys.ALTER_CONFIGS)
+    CompletableFuture<AlterConfigsResponse> alterConfigs(AlterConfigsRequest 
request);
+
     // todo: rename table & alter table
 
 }
diff --git 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java
 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java
index 47b5dc4c8..d33305dd0 100644
--- 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java
+++ 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/gateway/AdminReadOnlyGateway.java
@@ -20,6 +20,8 @@ package com.alibaba.fluss.rpc.gateway;
 import com.alibaba.fluss.rpc.RpcGateway;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsRequest;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsResponse;
 import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
 import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
 import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
@@ -187,4 +189,7 @@ public interface AdminReadOnlyGateway extends RpcGateway {
      */
     @RPC(api = ApiKeys.LIST_ACLS)
     CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request);
+
+    @RPC(api = ApiKeys.DESCRIBE_CONFIGS)
+    CompletableFuture<DescribeConfigsResponse> 
describeConfigs(DescribeConfigsRequest request);
 }
diff --git 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServerHandler.java
 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServerHandler.java
index c637fbd4f..0f9dc1dc0 100644
--- 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServerHandler.java
+++ 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/netty/server/NettyServerHandler.java
@@ -170,12 +170,12 @@ public final class NettyServerHandler extends 
ChannelInboundHandlerAdapter {
                 authenticator.isCompleted()
                         ? ConnectionState.READY
                         : ConnectionState.AUTHENTICATING);
-
         // TODO: connection metrics (count, client tags, receive request avg 
idle time, etc.)
     }
 
     @Override
     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+        // todo: remove later
         super.channelInactive(ctx);
     }
 
@@ -209,7 +209,7 @@ public final class NettyServerHandler extends 
ChannelInboundHandlerAdapter {
 
     private void close() {
         switchState(ConnectionState.CLOSE);
-        IOUtils.closeQuietly(authenticator);
+        IOUtils.closeQuietly(authenticator::close);
         ctx.close();
     }
 
diff --git 
a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java
index ec73d3da6..1f43442c0 100644
--- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java
+++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/ApiKeys.java
@@ -70,7 +70,9 @@ public enum ApiKeys {
     CREATE_ACLS(1039, 0, 0, PUBLIC),
     LIST_ACLS(1040, 0, 0, PUBLIC),
     DROP_ACLS(1041, 0, 0, PUBLIC),
-    LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE);
+    LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE),
+    DESCRIBE_CONFIGS(1043, 0, 0, PUBLIC),
+    ALTER_CONFIGS(1044, 0, 0, PUBLIC);
 
     private static final Map<Integer, ApiKeys> ID_TO_TYPE =
             Arrays.stream(ApiKeys.values())
diff --git a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java 
b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java
index 93213f73a..63a46a693 100644
--- a/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java
+++ b/fluss-rpc/src/main/java/com/alibaba/fluss/rpc/protocol/Errors.java
@@ -34,6 +34,7 @@ import 
com.alibaba.fluss.exception.InvalidCoordinatorException;
 import com.alibaba.fluss.exception.InvalidDatabaseException;
 import com.alibaba.fluss.exception.InvalidPartitionException;
 import com.alibaba.fluss.exception.InvalidReplicationFactorException;
+import com.alibaba.fluss.exception.InvalidRequestException;
 import com.alibaba.fluss.exception.InvalidRequiredAcksException;
 import com.alibaba.fluss.exception.InvalidServerRackInfoException;
 import com.alibaba.fluss.exception.InvalidTableException;
@@ -214,7 +215,11 @@ public enum Errors {
     INVALID_SERVER_RACK_INFO_EXCEPTION(
             52, "The server rack info is invalid.", 
InvalidServerRackInfoException::new),
     LAKE_SNAPSHOT_NOT_EXIST(
-            53, "The lake snapshot is not exist.", 
LakeTableSnapshotNotExistException::new);
+            53, "The lake snapshot is not exist.", 
LakeTableSnapshotNotExistException::new),
+    INVALID_REQUEST(
+            54,
+            "This most likely occurs because of a request being malformed by 
the client library or the message was sent to an incompatible server. See the 
server logs for more details.",
+            InvalidRequestException::new);
 
     private static final Logger LOG = LoggerFactory.getLogger(Errors.class);
 
diff --git a/fluss-rpc/src/main/proto/FlussApi.proto 
b/fluss-rpc/src/main/proto/FlussApi.proto
index 246dbe89e..1eef39f10 100644
--- a/fluss-rpc/src/main/proto/FlussApi.proto
+++ b/fluss-rpc/src/main/proto/FlussApi.proto
@@ -529,6 +529,38 @@ message LakeTieringHeartbeatResponse {
   repeated PbHeartbeatRespForTable failed_table_resp = 5;
 }
 
+message DescribeConfigsRequest{
+}
+
+message DescribeConfigsResponse{
+  repeated PbDescribeConfigsResponseInfo infos = 1;
+}
+
+
+message PbDescribeConfigsResponseInfo{
+  required string config_key = 1;
+  optional string config_value = 2;
+  required string config_source = 3;
+}
+
+
+
+message AlterConfigsRequest{
+ repeated PbAlterConfigsRequestInfo infos = 1;
+}
+
+
+message PbAlterConfigsRequestInfo{
+  required string config_key = 1;
+  optional string config_value = 2;
+  required int32 op_type = 3;
+}
+
+
+message AlterConfigsResponse{
+}
+
+
 
 // --------------- Inner classes ----------------
 message PbApiVersion {
@@ -864,4 +896,4 @@ message PbHeartbeatReqForTable {
 message PbHeartbeatRespForTable {
   required int64 table_id = 1;
   optional ErrorResponse error = 2;
-}
\ No newline at end of file
+}
diff --git 
a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java
 
b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java
index ffb36d8ef..2842ff05e 100644
--- 
a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java
+++ 
b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/TestingTabletGatewayService.java
@@ -21,6 +21,8 @@ import com.alibaba.fluss.cluster.ServerType;
 import com.alibaba.fluss.rpc.gateway.TabletServerGateway;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsRequest;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsResponse;
 import com.alibaba.fluss.rpc.messages.FetchLogRequest;
 import com.alibaba.fluss.rpc.messages.FetchLogResponse;
 import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
@@ -241,4 +243,10 @@ public class TestingTabletGatewayService extends 
TestingGatewayService
     public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest 
request) {
         return null;
     }
+
+    @Override
+    public CompletableFuture<DescribeConfigsResponse> describeConfigs(
+            DescribeConfigsRequest request) {
+        return null;
+    }
 }
diff --git 
a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java
 
b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java
index 2e1b168f6..9cb3fe156 100644
--- 
a/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java
+++ 
b/fluss-rpc/src/test/java/com/alibaba/fluss/rpc/netty/client/NettyClientTest.java
@@ -200,7 +200,7 @@ final class NettyClientTest {
                                 service,
                                 metricGroup,
                                 
RequestsMetrics.createCoordinatorServerRequestMetrics(
-                                        metricGroup)); ) {
+                                        metricGroup))) {
             multipleEndpointsServer.start();
             ApiVersionsRequest request =
                     new ApiVersionsRequest()
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/ConfigHandler.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/ConfigHandler.java
new file mode 100644
index 000000000..460abb7d8
--- /dev/null
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/ConfigHandler.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server;
+
+import java.util.Map;
+
+/** ConfigHandler to handle config changes. */
+public interface ConfigHandler {
+    void processConfigChanges(String configName, Map<String, String> 
properties);
+}
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/DynamicConfigManager.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/DynamicConfigManager.java
new file mode 100644
index 000000000..39d9f4478
--- /dev/null
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/DynamicConfigManager.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server;
+
+import com.alibaba.fluss.annotation.VisibleForTesting;
+import com.alibaba.fluss.config.dynamic.AlterConfigOp;
+import com.alibaba.fluss.config.dynamic.ConfigEntry;
+import com.alibaba.fluss.exception.ConfigException;
+import com.alibaba.fluss.server.authorizer.ZkNodeChangeNotificationWatcher;
+import com.alibaba.fluss.server.zk.ZooKeeperClient;
+import com.alibaba.fluss.server.zk.data.ZkData;
+import com.alibaba.fluss.utils.clock.SystemClock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Manager for dynamic configurations. */
+public class DynamicConfigManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicConfigManager.class);
+    private static final long CHANGE_NOTIFICATION_EXPIRATION_MS = 15 * 60 * 
1000L;
+
+    private final DynamicServerConfig dynamicServerConfig;
+    private final ZooKeeperClient zooKeeperClient;
+    private final ZkNodeChangeNotificationWatcher configChangeListener;
+    private final boolean isCoordinator;
+
+    public DynamicConfigManager(
+            ZooKeeperClient zooKeeperClient,
+            DynamicServerConfig dynamicServerConfig,
+            boolean isCoordinator) {
+        this.dynamicServerConfig = dynamicServerConfig;
+        this.zooKeeperClient = zooKeeperClient;
+        this.isCoordinator = isCoordinator;
+        this.configChangeListener =
+                new ZkNodeChangeNotificationWatcher(
+                        zooKeeperClient,
+                        ZkData.ConfigEntityChangeNotificationZNode.path(),
+                        
ZkData.ConfigEntityChangeNotificationSequenceZNode.prefix(),
+                        CHANGE_NOTIFICATION_EXPIRATION_MS,
+                        new ConfigChangedNotificationHandler(),
+                        SystemClock.getInstance());
+    }
+
+    public void startup() throws Exception {
+        try {
+            configChangeListener.start();
+            Map<String, String> entityConfigs = 
zooKeeperClient.fetchEntityConfig();
+            dynamicServerConfig.updateDynamicConfig(entityConfigs, true);
+        } catch (Exception e) {
+            LOG.error("Failed to update dynamic configs from zookeeper", e);
+        }
+    }
+
+    public void close() {
+        configChangeListener.stop();
+    }
+
+    public List<ConfigEntry> describeConfigs() {
+        Map<String, String> dynamicDefaultConfigs = 
dynamicServerConfig.getDynamicConfigs();
+        Map<String, String> staticServerConfigs = 
dynamicServerConfig.getInitialServerConfigs();
+
+        List<ConfigEntry> configEntries = new ArrayList<>();
+        staticServerConfigs.forEach(
+                (key, value) -> {
+                    ConfigEntry configEntry =
+                            new ConfigEntry(
+                                    key, value, 
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG);
+                    configEntries.add(configEntry);
+                });
+        dynamicDefaultConfigs.forEach(
+                (key, value) -> {
+                    ConfigEntry configEntry =
+                            new ConfigEntry(
+                                    key, value, 
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG);
+                    configEntries.add(configEntry);
+                });
+
+        return configEntries;
+    }
+
+    public void alterConfigs(List<AlterConfigOp> serverConfigChanges) throws 
Exception {
+        Map<String, String> persistentProps = 
zooKeeperClient.fetchEntityConfig();
+        prepareIncrementalConfigs(serverConfigChanges, persistentProps);
+        alterServerConfigs(persistentProps);
+    }
+
+    private void prepareIncrementalConfigs(
+            List<AlterConfigOp> alterConfigOps, Map<String, String> 
configsProps) {
+        alterConfigOps.forEach(
+                alterConfigOp -> {
+                    String configPropName = alterConfigOp.key();
+                    if (!dynamicServerConfig.isAllowedConfig(configPropName)) {
+                        throw new ConfigException(
+                                String.format(
+                                        "The config key %s is not allowed to 
be changed dynamically.",
+                                        configPropName));
+                    }
+
+                    String configPropValue = alterConfigOp.value();
+                    switch (alterConfigOp.opType()) {
+                        case SET:
+                            configsProps.put(configPropName, configPropValue);
+                            break;
+                        case DELETE:
+                            configsProps.remove(configPropName);
+                            break;
+                            //                        case APPEND:
+                            //                            {
+                            //                                //               
     if
+                            // (!configKeys.containsKey(configPropName) ||
+                            //                                //
+                            // !configKeys.get(configPropName).isList()) {
+                            //                                //               
         throw new
+                            // InvalidRequestException("Config
+                            //                                // value append 
is not allowed for
+                            // config key " + configPropName);
+                            //                                //               
     }
+                            //                                List<String> 
oldValueList =
+                            //
+                            // getOldListValue(configPropName, configsProps, 
configKeys);
+                            //                                List<String> 
appendValueList =
+                            //
+                            // Arrays.asList(configPropValue.split(","));
+                            //                                
ArrayList<String> newValueList = new
+                            // ArrayList<>(oldValueList);
+                            //                                
newValueList.addAll(appendValueList);
+                            //                                
configsProps.put(configPropName,
+                            // String.join(",", newValueList));
+                            //                                break;
+                            //                            }
+                            //                        case SUBTRACT:
+                            //                            {
+                            //                                //               
     if
+                            // (!configKeys.containsKey(configPropName) ||
+                            //                                //
+                            // !configKeys.get(configPropName).isList()) {
+                            //                                //               
         throw new
+                            // InvalidRequestException("Config
+                            //                                // value 
subtract is not allowed for
+                            // config key " + configPropName);
+                            //                                //               
     }
+                            //                                List<String> 
oldValueList =
+                            //
+                            // getOldListValue(configPropName, configsProps, 
configKeys);
+                            //                                List<String> 
substractValueList =
+                            //
+                            // Arrays.asList(configPropValue.split(","));
+                            //                                
ArrayList<String> newValueList = new
+                            // ArrayList<>(oldValueList);
+                            //
+                            // newValueList.removeAll(substractValueList);
+                            //                                
configsProps.put(configPropName,
+                            // String.join(",", newValueList));
+                            //                                break;
+                            //                            }
+                        default:
+                            throw new ConfigException(
+                                    "Unknown config operation type " + 
alterConfigOp.opType());
+                    }
+                });
+    }
+
+    //    private List<String> getOldListValue(
+    //            String configPropName,
+    //            Map<String, String> configsProps,
+    //            Map<String, ConfigOption<?>> configKeys) {
+    //        List<String> oldValueList;
+    //        if (configsProps.containsKey(configPropName)) {
+    //            oldValueList = 
Arrays.asList(configsProps.get(configPropName).split(","));
+    //        } else if (configKeys.get(configPropName).hasDefaultValue()) {
+    //            List<?> list = (List<?>) 
configKeys.get(configPropName).defaultValue();
+    //            oldValueList = 
list.stream().map(String::valueOf).collect(Collectors.toList());
+    //        } else {
+    //            oldValueList = Collections.emptyList();
+    //        }
+    //        return oldValueList;
+    //    }
+
+    @VisibleForTesting
+    protected void alterServerConfigs(Map<String, String> configsProps) throws 
Exception {
+        dynamicServerConfig.updateDynamicConfig(configsProps, false);
+
+        // only after verify can add and apply.
+        zooKeeperClient.upsertServerEntityConfig(configsProps);
+    }
+
+    private class ConfigChangedNotificationHandler
+            implements ZkNodeChangeNotificationWatcher.NotificationHandler {
+
+        @Override
+        public void processNotification(byte[] notification) throws Exception {
+            if (isCoordinator) {
+                return;
+            }
+
+            if (notification.length != 0) {
+                throw new ConfigException(
+                        "Config change notification of this version is only 
empty");
+            }
+
+            Map<String, String> entityConfig = 
zooKeeperClient.fetchEntityConfig();
+            // todo: log日志,对一些敏感的key加密
+            dynamicServerConfig.updateDynamicConfig(entityConfig, true);
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/DynamicServerConfig.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/DynamicServerConfig.java
new file mode 100644
index 000000000..475d5e727
--- /dev/null
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/DynamicServerConfig.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server;
+
+import com.alibaba.fluss.annotation.Internal;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.config.dynamic.ServerReconfigurable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static com.alibaba.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inReadLock;
+import static com.alibaba.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/**
+ * DynamicServerConfig, 使用时如下: 如果一个ServerReconfigurable实现类想要监听配置变化,可以通过方法注册. 
后续{@link
+ * DynamicConfigManager} 监听变更后,会更新配置项,并且推送给这些ServerReconfigurable. TODO: 
添加test:类似ConfigurationTest.
+ */
+@Internal
+public class DynamicServerConfig {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicServerConfig.class);
+    private final Set<ServerReconfigurable> serverReconfigurableSet = 
ConcurrentHashMap.newKeySet();
+
+    /** The initial configuration items when the server starts from 
server.yaml. */
+    private final Map<String, String> initialConfigMap;
+
+    /** The dynamic configuration items that are added during running(stored 
in zk). */
+    private final Map<String, String> dynamicConfigs = new HashMap<>();
+
+    /**
+     * The current configuration, which is a combination of initial 
configuration and dynamic
+     * configuration.
+     */
+    private volatile Configuration currentConfig;
+
+    private final Map<String, String> currentConfigMap;
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+    private static final Set<String> ALLOWED_CONFIG_KEYS =
+            Collections.singleton(DATALAKE_FORMAT.key());
+    private static final Set<String> ALLOWED_CONFIG_PREFIXES = 
Collections.singleton("datalake.");
+
+    public DynamicServerConfig(Configuration flussConfig) {
+        this.currentConfig = flussConfig;
+        this.initialConfigMap = flussConfig.toMap();
+        this.currentConfigMap = flussConfig.toMap();
+    }
+
+    /** Register a ServerReconfigurable which listens to configuration 
changes. */
+    public void register(ServerReconfigurable serverReconfigurable) {
+        serverReconfigurableSet.add(serverReconfigurable);
+    }
+
+    /** Update the dynamic configuration and apply to registered 
ServerReconfigurables. */
+    public void updateDynamicConfig(Map<String, String> newDynamicConfigs, 
boolean skipErrorConfig)
+            throws Exception {
+        inWriteLock(lock, () -> updateCurrentConfig(newDynamicConfigs, 
skipErrorConfig));
+    }
+
+    public Configuration getCurrentConfig() {
+        return inReadLock(lock, () -> currentConfig);
+    }
+
+    public Map<String, String> getDynamicConfigs() {
+        return dynamicConfigs;
+    }
+
+    public Map<String, String> getInitialServerConfigs() {
+        return initialConfigMap;
+    }
+
+    public boolean isAllowedConfig(String key) {
+        if (ALLOWED_CONFIG_KEYS.contains(key)) {
+            return true;
+        }
+
+        for (String prefix : ALLOWED_CONFIG_PREFIXES) {
+            if (key.startsWith(prefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void updateCurrentConfig(Map<String, String> newDynamicConfigs, 
boolean skipErrorConfig)
+            throws Exception {
+        Map<String, String> newProps = new HashMap<>(initialConfigMap);
+        overrideProps(newProps, newDynamicConfigs);
+        Configuration newConfig = Configuration.fromMap(newProps);
+        Configuration oldConfig = currentConfig;
+        Set<ServerReconfigurable> appliedServerReconfigurableSet = new 
HashSet<>();
+        if (!newProps.equals(currentConfigMap)) {
+            serverReconfigurableSet.forEach(
+                    serverReconfigurable -> {
+                        try {
+                            serverReconfigurable.validate(newConfig);
+                        } catch (Exception e) {
+                            LOG.error(
+                                    "Validate new dynamic config error and 
will roll back all the applied config.",
+                                    e);
+                            if (!skipErrorConfig) {
+                                throw e;
+                            }
+                        }
+                    });
+
+            Exception throwable = null;
+            for (ServerReconfigurable serverReconfigurable : 
serverReconfigurableSet) {
+                try {
+                    serverReconfigurable.reconfigure(newConfig);
+                    appliedServerReconfigurableSet.add(serverReconfigurable);
+                } catch (Exception e) {
+                    LOG.error(
+                            "Apply new dynamic error and will roll back all 
the applied config.",
+                            e);
+                    if (!skipErrorConfig) {
+                        throwable = e;
+                        break;
+                    }
+                }
+            }
+
+            // rollback to old config if there is an error.
+            if (throwable != null) {
+                appliedServerReconfigurableSet.forEach(
+                        serverReconfigurable -> 
serverReconfigurable.reconfigure(oldConfig));
+                throw throwable;
+            }
+
+            currentConfig = newConfig;
+            currentConfigMap.clear();
+            dynamicConfigs.clear();
+            currentConfigMap.putAll(newProps);
+            dynamicConfigs.putAll(newDynamicConfigs);
+            LOG.info("Dynamic configs changed: {}", newDynamicConfigs);
+        }
+    }
+
+    private void overrideProps(Map<String, String> props, Map<String, String> 
propsOverride) {
+        propsOverride.forEach(
+                (key, value) -> {
+                    if (value == null) {
+                        props.remove(key);
+                    } else {
+                        props.put(key, value);
+                    }
+                });
+    }
+}
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java
index 4dd37bda0..be33ec519 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/RpcServiceBase.java
@@ -18,6 +18,7 @@
 package com.alibaba.fluss.server;
 
 import com.alibaba.fluss.cluster.ServerType;
+import com.alibaba.fluss.config.dynamic.ConfigEntry;
 import com.alibaba.fluss.exception.FlussRuntimeException;
 import com.alibaba.fluss.exception.KvSnapshotNotExistException;
 import com.alibaba.fluss.exception.LakeTableSnapshotNotExistException;
@@ -42,6 +43,8 @@ import com.alibaba.fluss.rpc.messages.ApiVersionsRequest;
 import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsRequest;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsResponse;
 import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
 import com.alibaba.fluss.rpc.messages.GetDatabaseInfoResponse;
 import com.alibaba.fluss.rpc.messages.GetFileSystemSecurityTokenRequest;
@@ -67,6 +70,7 @@ import com.alibaba.fluss.rpc.messages.ListTablesResponse;
 import com.alibaba.fluss.rpc.messages.MetadataRequest;
 import com.alibaba.fluss.rpc.messages.MetadataResponse;
 import com.alibaba.fluss.rpc.messages.PbApiVersion;
+import com.alibaba.fluss.rpc.messages.PbDescribeConfigsResponseInfo;
 import com.alibaba.fluss.rpc.messages.PbTablePath;
 import com.alibaba.fluss.rpc.messages.TableExistsRequest;
 import com.alibaba.fluss.rpc.messages.TableExistsResponse;
@@ -139,6 +143,7 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
     protected final ZooKeeperClient zkClient;
     protected final MetadataManager metadataManager;
     protected final @Nullable Authorizer authorizer;
+    protected final DynamicConfigManager dynamicConfigManager;
 
     private long tokenLastUpdateTimeMs = 0;
     private ObtainedSecurityToken securityToken = null;
@@ -148,13 +153,15 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
             ServerType provider,
             ZooKeeperClient zkClient,
             MetadataManager metadataManager,
-            @Nullable Authorizer authorizer) {
+            @Nullable Authorizer authorizer,
+            DynamicConfigManager dynamicConfigManager) {
         this.remoteFileSystem = remoteFileSystem;
         this.provider = provider;
         this.apiManager = new ApiManager(provider);
         this.zkClient = zkClient;
         this.metadataManager = metadataManager;
         this.authorizer = authorizer;
+        this.dynamicConfigManager = dynamicConfigManager;
     }
 
     @Override
@@ -466,6 +473,35 @@ public abstract class RpcServiceBase extends 
RpcGatewayService implements AdminR
         }
     }
 
+    @Override
+    public CompletableFuture<DescribeConfigsResponse> describeConfigs(
+            DescribeConfigsRequest request) {
+        if (authorizer != null) {
+            authorizer.authorize(
+                    currentSession(), OperationType.DESCRIBE_CONFIGS, 
Resource.cluster());
+        }
+
+        List<ConfigEntry> configs = dynamicConfigManager.describeConfigs();
+        List<PbDescribeConfigsResponseInfo> pbConfigsInfos =
+                configs.stream()
+                        .map(
+                                configEntry -> {
+                                    PbDescribeConfigsResponseInfo 
pbDescribeConfigsResponseInfo =
+                                            new PbDescribeConfigsResponseInfo()
+                                                    
.setConfigKey(configEntry.key())
+                                                    
.setConfigSource(configEntry.source().name());
+                                    if (configEntry.value() != null) {
+                                        
pbDescribeConfigsResponseInfo.setConfigValue(
+                                                configEntry.value());
+                                    }
+                                    return pbDescribeConfigsResponseInfo;
+                                })
+                        .collect(Collectors.toList());
+
+        return CompletableFuture.completedFuture(
+                new DescribeConfigsResponse().addAllInfos(pbConfigsInfos));
+    }
+
     protected MetadataResponse makeMetadataResponse(
             MetadataRequest request,
             String listenerName,
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
index 9e18be148..a10be9566 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/ServerBase.java
@@ -69,12 +69,14 @@ public abstract class ServerBase implements 
AutoCloseableAsync, FatalErrorHandle
     protected static final long ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS = 3 * 
1000L;
 
     protected final Configuration conf;
+    protected final DynamicServerConfig dynamicServerConfig;
 
     protected FileSystem remoteFileSystem;
     protected PluginManager pluginManager;
 
     protected ServerBase(Configuration conf) {
         this.conf = conf;
+        this.dynamicServerConfig = new DynamicServerConfig(conf);
     }
 
     private Thread shutDownHook;
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
index e8388f10f..917dd51a9 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorServer.java
@@ -23,17 +23,13 @@ import com.alibaba.fluss.cluster.ServerType;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
 import com.alibaba.fluss.exception.IllegalConfigurationException;
-import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
-import com.alibaba.fluss.lake.lakestorage.LakeStorage;
-import com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin;
-import com.alibaba.fluss.lake.lakestorage.LakeStoragePluginSetUp;
-import com.alibaba.fluss.metadata.DataLakeFormat;
 import com.alibaba.fluss.metadata.DatabaseDescriptor;
 import com.alibaba.fluss.metrics.registry.MetricRegistry;
 import com.alibaba.fluss.rpc.RpcClient;
 import com.alibaba.fluss.rpc.RpcServer;
 import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
 import com.alibaba.fluss.rpc.netty.server.RequestsMetrics;
+import com.alibaba.fluss.server.DynamicConfigManager;
 import com.alibaba.fluss.server.ServerBase;
 import com.alibaba.fluss.server.authorizer.Authorizer;
 import com.alibaba.fluss.server.authorizer.AuthorizerLoader;
@@ -59,7 +55,6 @@ import javax.annotation.concurrent.GuardedBy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -67,9 +62,6 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static 
com.alibaba.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
-import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
-
 /**
  * Coordinator server implementation. The coordinator server is responsible to:
  *
@@ -142,6 +134,9 @@ public class CoordinatorServer extends ServerBase {
     @GuardedBy("lock")
     private CoordinatorContext coordinatorContext;
 
+    @GuardedBy("lock")
+    private DynamicConfigManager dynamicConfigManager;
+
     public CoordinatorServer(Configuration conf) {
         super(conf);
         validateConfigs(conf);
@@ -173,6 +168,10 @@ public class CoordinatorServer extends ServerBase {
 
             this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
 
+            this.dynamicConfigManager =
+                    new DynamicConfigManager(zkClient, dynamicServerConfig, 
true);
+            dynamicConfigManager.startup();
+
             this.coordinatorContext = new CoordinatorContext();
             this.metadataCache = new CoordinatorMetadataCache();
 
@@ -193,8 +192,9 @@ public class CoordinatorServer extends ServerBase {
                             metadataCache,
                             metadataManager,
                             authorizer,
-                            createLakeCatalog(),
-                            lakeTableTieringManager);
+                            new LakeCatalogDynamicLoader(dynamicServerConfig, 
pluginManager),
+                            lakeTableTieringManager,
+                            dynamicConfigManager);
 
             this.rpcServer =
                     RpcServer.create(
@@ -247,21 +247,6 @@ public class CoordinatorServer extends ServerBase {
         }
     }
 
-    @Nullable
-    private LakeCatalog createLakeCatalog() {
-        DataLakeFormat dataLakeFormat = 
conf.get(ConfigOptions.DATALAKE_FORMAT);
-        if (dataLakeFormat == null) {
-            return null;
-        }
-        LakeStoragePlugin lakeStoragePlugin =
-                
LakeStoragePluginSetUp.fromDataLakeFormat(dataLakeFormat.toString(), 
pluginManager);
-        Map<String, String> lakeProperties = extractLakeProperties(conf);
-        LakeStorage lakeStorage =
-                lakeStoragePlugin.createLakeStorage(
-                        Configuration.fromMap(checkNotNull(lakeProperties)));
-        return lakeStorage.createLakeCatalog();
-    }
-
     @Override
     protected CompletableFuture<Result> closeAsync(Result result) {
         if (isShutDown.compareAndSet(false, true)) {
@@ -448,6 +433,14 @@ public class CoordinatorServer extends ServerBase {
                 exception = ExceptionUtils.firstOrSuppressed(t, exception);
             }
 
+            try {
+                if (dynamicConfigManager != null) {
+                    dynamicConfigManager.close();
+                }
+            } catch (Throwable t) {
+                exception = ExceptionUtils.firstOrSuppressed(t, exception);
+            }
+
             try {
                 if (rpcClient != null) {
                     rpcClient.close();
@@ -497,6 +490,10 @@ public class CoordinatorServer extends ServerBase {
         return authorizer;
     }
 
+    public DynamicConfigManager getDynamicConfigManager() {
+        return dynamicConfigManager;
+    }
+
     private static void validateConfigs(Configuration conf) {
         if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
             throw new IllegalConfigurationException(
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
index b1d588fe1..472c11853 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/CoordinatorService.java
@@ -17,10 +17,12 @@
 
 package com.alibaba.fluss.server.coordinator;
 
+import com.alibaba.fluss.annotation.VisibleForTesting;
 import com.alibaba.fluss.cluster.ServerType;
 import com.alibaba.fluss.cluster.TabletServerInfo;
 import com.alibaba.fluss.config.ConfigOptions;
 import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.config.dynamic.AlterConfigOp;
 import com.alibaba.fluss.exception.InvalidCoordinatorException;
 import com.alibaba.fluss.exception.InvalidDatabaseException;
 import com.alibaba.fluss.exception.InvalidTableException;
@@ -28,7 +30,6 @@ import com.alibaba.fluss.exception.SecurityDisabledException;
 import com.alibaba.fluss.exception.TableAlreadyExistException;
 import com.alibaba.fluss.exception.TableNotPartitionedException;
 import com.alibaba.fluss.fs.FileSystem;
-import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
 import com.alibaba.fluss.metadata.DataLakeFormat;
 import com.alibaba.fluss.metadata.DatabaseDescriptor;
 import com.alibaba.fluss.metadata.PartitionSpec;
@@ -42,6 +43,8 @@ import com.alibaba.fluss.metadata.TablePath;
 import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
 import com.alibaba.fluss.rpc.messages.AdjustIsrRequest;
 import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
+import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
+import com.alibaba.fluss.rpc.messages.AlterConfigsResponse;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotResponse;
 import com.alibaba.fluss.rpc.messages.CommitLakeTableSnapshotRequest;
@@ -68,6 +71,7 @@ import 
com.alibaba.fluss.rpc.messages.LakeTieringHeartbeatRequest;
 import com.alibaba.fluss.rpc.messages.LakeTieringHeartbeatResponse;
 import com.alibaba.fluss.rpc.messages.MetadataRequest;
 import com.alibaba.fluss.rpc.messages.MetadataResponse;
+import com.alibaba.fluss.rpc.messages.PbAlterConfigsRequestInfo;
 import com.alibaba.fluss.rpc.messages.PbHeartbeatReqForTable;
 import com.alibaba.fluss.rpc.messages.PbHeartbeatRespForTable;
 import com.alibaba.fluss.rpc.netty.server.Session;
@@ -76,6 +80,7 @@ import com.alibaba.fluss.security.acl.AclBinding;
 import com.alibaba.fluss.security.acl.AclBindingFilter;
 import com.alibaba.fluss.security.acl.OperationType;
 import com.alibaba.fluss.security.acl.Resource;
+import com.alibaba.fluss.server.DynamicConfigManager;
 import com.alibaba.fluss.server.RpcServiceBase;
 import com.alibaba.fluss.server.authorizer.AclCreateResult;
 import com.alibaba.fluss.server.authorizer.AclDeleteResult;
@@ -113,6 +118,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 
 import static 
com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters;
 import static com.alibaba.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings;
@@ -127,7 +133,6 @@ import static 
com.alibaba.fluss.server.utils.ServerRpcMessageUtils.toTablePath;
 import static 
com.alibaba.fluss.server.utils.TableAssignmentUtils.generateAssignment;
 import static com.alibaba.fluss.utils.PartitionUtils.validatePartitionSpec;
 import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
-import static com.alibaba.fluss.utils.Preconditions.checkState;
 
 /** An RPC Gateway service for coordinator server. */
 public final class CoordinatorService extends RpcServiceBase implements 
CoordinatorGateway {
@@ -138,10 +143,8 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
     private final Supplier<Integer> coordinatorEpochSupplier;
     private final ServerMetadataCache metadataCache;
 
-    // null if the cluster hasn't configured datalake format
-    private final @Nullable DataLakeFormat dataLakeFormat;
-    private final @Nullable LakeCatalog lakeCatalog;
     private final LakeTableTieringManager lakeTableTieringManager;
+    private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader;
 
     public CoordinatorService(
             Configuration conf,
@@ -151,24 +154,25 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
             ServerMetadataCache metadataCache,
             MetadataManager metadataManager,
             @Nullable Authorizer authorizer,
-            @Nullable LakeCatalog lakeCatalog,
-            LakeTableTieringManager lakeTableTieringManager) {
-        super(remoteFileSystem, ServerType.COORDINATOR, zkClient, 
metadataManager, authorizer);
+            LakeCatalogDynamicLoader lakeCatalogDynamicLoader,
+            LakeTableTieringManager lakeTableTieringManager,
+            DynamicConfigManager dynamicConfigManager) {
+        super(
+                remoteFileSystem,
+                ServerType.COORDINATOR,
+                zkClient,
+                metadataManager,
+                authorizer,
+                dynamicConfigManager);
         this.defaultBucketNumber = 
conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER);
         this.defaultReplicationFactor = 
conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR);
         this.eventManagerSupplier =
                 () -> 
coordinatorEventProcessorSupplier.get().getCoordinatorEventManager();
         this.coordinatorEpochSupplier =
                 () -> 
coordinatorEventProcessorSupplier.get().getCoordinatorEpoch();
-        this.dataLakeFormat = 
conf.getOptional(ConfigOptions.DATALAKE_FORMAT).orElse(null);
-        this.lakeCatalog = lakeCatalog;
         this.lakeTableTieringManager = lakeTableTieringManager;
         this.metadataCache = metadataCache;
-        checkState(
-                (dataLakeFormat == null) == (lakeCatalog == null),
-                "dataLakeFormat and lakeCatalog must both be null or both 
non-null, but dataLakeFormat is %s, lakeCatalog is %s.",
-                dataLakeFormat,
-                lakeCatalog);
+        this.lakeCatalogDynamicLoader = lakeCatalogDynamicLoader;
     }
 
     @Override
@@ -178,7 +182,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
 
     @Override
     public void shutdown() {
-        IOUtils.closeQuietly(lakeCatalog, "lake catalog");
+        IOUtils.closeQuietly(lakeCatalogDynamicLoader, "lake catalog");
     }
 
     @Override
@@ -266,13 +270,16 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
         // before create table in fluss, we may create in lake
         if (isDataLakeEnabled(tableDescriptor)) {
             try {
-                checkNotNull(lakeCatalog).createTable(tablePath, 
tableDescriptor);
+                checkNotNull(lakeCatalogDynamicLoader.getLakeCatalog())
+                        .createTable(tablePath, tableDescriptor);
             } catch (TableAlreadyExistException e) {
                 throw new TableAlreadyExistException(
                         String.format(
                                 "The table %s already exists in %s catalog, 
please "
                                         + "first drop the table in %s catalog 
or use a new table name.",
-                                tablePath, dataLakeFormat, dataLakeFormat));
+                                tablePath,
+                                lakeCatalogDynamicLoader.getDataLakeFormat(),
+                                lakeCatalogDynamicLoader.getDataLakeFormat()));
             }
         }
 
@@ -285,6 +292,7 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
 
     private TableDescriptor applySystemDefaults(TableDescriptor 
tableDescriptor) {
         TableDescriptor newDescriptor = tableDescriptor;
+        DataLakeFormat dataLakeFormat = 
lakeCatalogDynamicLoader.getDataLakeFormat();
 
         // not set bucket num
         if (!newDescriptor.getTableDistribution().isPresent()
@@ -650,4 +658,48 @@ public final class CoordinatorService extends 
RpcServiceBase implements Coordina
                 });
         return bucketMetadataList;
     }
+
+    @Override
+    public CompletableFuture<AlterConfigsResponse> 
alterConfigs(AlterConfigsRequest request) {
+        CompletableFuture<AlterConfigsResponse> future = new 
CompletableFuture<>();
+        List<PbAlterConfigsRequestInfo> infos = request.getInfosList();
+        if (infos.isEmpty()) {
+            return CompletableFuture.completedFuture(new 
AlterConfigsResponse());
+        }
+
+        if (authorizer != null) {
+            authorizer.authorize(currentSession(), 
OperationType.ALTER_CONFIGS, Resource.cluster());
+        }
+
+        List<AlterConfigOp> serverConfigChanges =
+                infos.stream()
+                        .map(
+                                info ->
+                                        new AlterConfigOp(
+                                                info.getConfigKey(),
+                                                info.hasConfigValue()
+                                                        ? info.getConfigValue()
+                                                        : null,
+                                                AlterConfigOp.OpType.forId(
+                                                        (byte) 
info.getOpType())))
+                        .collect(Collectors.toList());
+        AccessContextEvent<Void> accessContextEvent =
+                new AccessContextEvent<>(
+                        (context) -> {
+                            try {
+                                
dynamicConfigManager.alterConfigs(serverConfigChanges);
+                                future.complete(new AlterConfigsResponse());
+                            } catch (Exception e) {
+                                future.completeExceptionally(e);
+                            }
+                            return null;
+                        });
+        eventManagerSupplier.get().put(accessContextEvent);
+        return future;
+    }
+
+    @VisibleForTesting
+    public DataLakeFormat getDataLakeFormat() {
+        return lakeCatalogDynamicLoader.getDataLakeFormat();
+    }
 }
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/LakeCatalogDynamicLoader.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/LakeCatalogDynamicLoader.java
new file mode 100644
index 000000000..ace324efc
--- /dev/null
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/coordinator/LakeCatalogDynamicLoader.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.coordinator;
+
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.config.dynamic.ServerReconfigurable;
+import com.alibaba.fluss.exception.ConfigException;
+import com.alibaba.fluss.lake.lakestorage.LakeCatalog;
+import com.alibaba.fluss.lake.lakestorage.LakeStorage;
+import com.alibaba.fluss.lake.lakestorage.LakeStoragePlugin;
+import com.alibaba.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import com.alibaba.fluss.metadata.DataLakeFormat;
+import com.alibaba.fluss.plugin.PluginManager;
+import com.alibaba.fluss.server.DynamicServerConfig;
+import com.alibaba.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static com.alibaba.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static 
com.alibaba.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static com.alibaba.fluss.utils.Preconditions.checkNotNull;
+import static com.alibaba.fluss.utils.Preconditions.checkState;
+
+/** A wrapper for a LakeCatalog. */
+public class LakeCatalogDynamicLoader implements ServerReconfigurable, 
AutoCloseable {
+    // null if the cluster hasn't configured datalake format
+    private @Nullable DataLakeFormat dataLakeFormat;
+    private @Nullable LakeCatalog lakeCatalog;
+    private Configuration currentConfiguration;
+    private final PluginManager pluginManager;
+
+    public LakeCatalogDynamicLoader(
+            DynamicServerConfig dynamicServerConfig, PluginManager 
pluginManager) {
+        Configuration currentConfig = dynamicServerConfig.getCurrentConfig();
+        this.currentConfiguration = currentConfig;
+        this.dataLakeFormat = 
currentConfig.getOptional(DATALAKE_FORMAT).orElse(null);
+        this.lakeCatalog = createLakeCatalog(currentConfig, pluginManager);
+        this.pluginManager = pluginManager;
+        checkState(
+                (dataLakeFormat == null) == (lakeCatalog == null),
+                "dataLakeFormat and lakeCatalog must both be null or both 
non-null, but dataLakeFormat is %s, lakeCatalog is %s.",
+                dataLakeFormat,
+                lakeCatalog);
+        dynamicServerConfig.register(this);
+    }
+
+    @Override
+    public void validate(Configuration newConfig) throws ConfigException {
+        DataLakeFormat newDatalakeFormat = null;
+        try {
+            if (newConfig.getOptional(DATALAKE_FORMAT).isPresent()) {
+                newDatalakeFormat = newConfig.get(DATALAKE_FORMAT);
+            } else {
+                newDatalakeFormat = currentConfiguration.get(DATALAKE_FORMAT);
+            }
+
+            if (newDatalakeFormat == null) {
+                return;
+            }
+        } catch (Exception e) {
+            throw new ConfigException(
+                    "Invalid configuration for datalake format "
+                            + newDatalakeFormat
+                            + ": "
+                            + e.getMessage());
+        }
+
+        Map<String, String> configMap = newConfig.toMap();
+        String datalakePrefix = "datalake." + newDatalakeFormat + ".";
+        DataLakeFormat finalNewDatalakeFormat = newDatalakeFormat;
+        configMap.forEach(
+                (key, value) -> {
+                    if (!key.equals(DATALAKE_FORMAT.key())
+                            && key.startsWith("datalake.")
+                            && !key.startsWith(datalakePrefix)) {
+                        throw new ConfigException(
+                                "Invalid configuration for datalake format "
+                                        + finalNewDatalakeFormat
+                                        + ": "
+                                        + key);
+                    }
+                });
+    }
+
+    @Override
+    public void reconfigure(Configuration newConfig) throws ConfigException {
+        DataLakeFormat newLakeFormat = 
newConfig.getOptional(DATALAKE_FORMAT).orElse(null);
+        if (newLakeFormat != dataLakeFormat) {
+            IOUtils.closeQuietly(lakeCatalog, "Lake catalog because config 
changes");
+            this.dataLakeFormat = newLakeFormat;
+            this.lakeCatalog = createLakeCatalog(newConfig, pluginManager);
+            this.currentConfiguration = newConfig;
+        }
+    }
+
+    @Nullable
+    private LakeCatalog createLakeCatalog(Configuration conf, PluginManager 
pluginManager) {
+        DataLakeFormat dataLakeFormat = 
conf.get(ConfigOptions.DATALAKE_FORMAT);
+        if (dataLakeFormat == null) {
+            return null;
+        }
+        LakeStoragePlugin lakeStoragePlugin =
+                
LakeStoragePluginSetUp.fromDataLakeFormat(dataLakeFormat.toString(), 
pluginManager);
+        Map<String, String> lakeProperties = extractLakeProperties(conf);
+        LakeStorage lakeStorage =
+                lakeStoragePlugin.createLakeStorage(
+                        Configuration.fromMap(checkNotNull(lakeProperties)));
+        return lakeStorage.createLakeCatalog();
+    }
+
+    public @Nullable DataLakeFormat getDataLakeFormat() {
+        return dataLakeFormat;
+    }
+
+    public @Nullable LakeCatalog getLakeCatalog() {
+        return lakeCatalog;
+    }
+
+    @Override
+    public void close() throws Exception {
+        if (lakeCatalog != null) {
+            lakeCatalog.close();
+        }
+    }
+}
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
index 76a75eb50..494d4befa 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletServer.java
@@ -31,6 +31,7 @@ import com.alibaba.fluss.rpc.RpcServer;
 import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
 import com.alibaba.fluss.rpc.metrics.ClientMetricGroup;
 import com.alibaba.fluss.rpc.netty.server.RequestsMetrics;
+import com.alibaba.fluss.server.DynamicConfigManager;
 import com.alibaba.fluss.server.ServerBase;
 import com.alibaba.fluss.server.authorizer.Authorizer;
 import com.alibaba.fluss.server.authorizer.AuthorizerLoader;
@@ -144,6 +145,9 @@ public class TabletServer extends ServerBase {
     @Nullable
     private Authorizer authorizer;
 
+    @GuardedBy("lock")
+    private DynamicConfigManager dynamicConfigManager;
+
     public TabletServer(Configuration conf) {
         this(conf, SystemClock.getInstance());
     }
@@ -181,7 +185,6 @@ public class TabletServer extends ServerBase {
                             serverId);
 
             this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this);
-
             MetadataManager metadataManager = new MetadataManager(zkClient, 
conf);
             this.metadataCache = new 
TabletServerMetadataCache(metadataManager, zkClient);
 
@@ -198,6 +201,7 @@ public class TabletServer extends ServerBase {
             if (authorizer != null) {
                 authorizer.startup();
             }
+
             // rpc client to sent request to the tablet server where the 
leader replica is located
             // to fetch log.
             this.clientMetricGroup =
@@ -228,6 +232,10 @@ public class TabletServer extends ServerBase {
                             clock);
             replicaManager.startup();
 
+            this.dynamicConfigManager =
+                    new DynamicConfigManager(zkClient, dynamicServerConfig, 
false);
+            dynamicConfigManager.startup();
+
             this.tabletService =
                     new TabletService(
                             serverId,
@@ -236,7 +244,8 @@ public class TabletServer extends ServerBase {
                             replicaManager,
                             metadataCache,
                             metadataManager,
-                            authorizer);
+                            authorizer,
+                            dynamicConfigManager);
 
             RequestsMetrics requestsMetrics =
                     
RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup);
@@ -395,6 +404,10 @@ public class TabletServer extends ServerBase {
                     authorizer.close();
                 }
 
+                if (dynamicConfigManager != null) {
+                    dynamicConfigManager.close();
+                }
+
             } catch (Throwable t) {
                 exception = ExceptionUtils.firstOrSuppressed(t, exception);
             }
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java
index 81f5f9fef..ac858c5b1 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/tablet/TabletService.java
@@ -64,6 +64,7 @@ import com.alibaba.fluss.rpc.protocol.ApiError;
 import com.alibaba.fluss.rpc.protocol.Errors;
 import com.alibaba.fluss.security.acl.OperationType;
 import com.alibaba.fluss.security.acl.Resource;
+import com.alibaba.fluss.server.DynamicConfigManager;
 import com.alibaba.fluss.server.RpcServiceBase;
 import com.alibaba.fluss.server.authorizer.Authorizer;
 import com.alibaba.fluss.server.coordinator.MetadataManager;
@@ -130,8 +131,15 @@ public final class TabletService extends RpcServiceBase 
implements TabletServerG
             ReplicaManager replicaManager,
             TabletServerMetadataCache metadataCache,
             MetadataManager metadataManager,
-            @Nullable Authorizer authorizer) {
-        super(remoteFileSystem, ServerType.TABLET_SERVER, zkClient, 
metadataManager, authorizer);
+            @Nullable Authorizer authorizer,
+            DynamicConfigManager dynamicConfigManager) {
+        super(
+                remoteFileSystem,
+                ServerType.TABLET_SERVER,
+                zkClient,
+                metadataManager,
+                authorizer,
+                dynamicConfigManager);
         this.serviceName = "server-" + serverId;
         this.replicaManager = replicaManager;
         this.metadataCache = metadataCache;
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
index 078201212..083805890 100644
--- 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/ZooKeeperClient.java
@@ -46,6 +46,7 @@ import com.alibaba.fluss.server.zk.data.ZkData.BucketIdsZNode;
 import com.alibaba.fluss.server.zk.data.ZkData.BucketRemoteLogsZNode;
 import com.alibaba.fluss.server.zk.data.ZkData.BucketSnapshotIdZNode;
 import com.alibaba.fluss.server.zk.data.ZkData.BucketSnapshotsZNode;
+import com.alibaba.fluss.server.zk.data.ZkData.ConfigEntityZNode;
 import com.alibaba.fluss.server.zk.data.ZkData.CoordinatorZNode;
 import com.alibaba.fluss.server.zk.data.ZkData.DatabaseZNode;
 import com.alibaba.fluss.server.zk.data.ZkData.DatabasesZNode;
@@ -872,6 +873,37 @@ public class ZooKeeperClient implements AutoCloseable {
         LOG.info("add acl change notification for resource {}  ", resource);
     }
 
+    public Map<String, String> fetchEntityConfig() throws Exception {
+        String path = ConfigEntityZNode.path();
+        return getOrEmpty(path).map(ConfigEntityZNode::decode).orElse(new 
HashMap<>());
+    }
+
+    public void upsertServerEntityConfig(Map<String, String> configs) throws 
Exception {
+        upsertEntityConfigs(configs);
+    }
+
+    public void upsertEntityConfigs(Map<String, String> configs) throws 
Exception {
+        String path = ConfigEntityZNode.path();
+        if (zkClient.checkExists().forPath(path) != null) {
+            zkClient.setData().forPath(path, 
ConfigEntityZNode.encode(configs));
+        } else {
+            zkClient.create()
+                    .creatingParentsIfNeeded()
+                    .forPath(path, ConfigEntityZNode.encode(configs));
+        }
+
+        insertConfigChangeNotification();
+    }
+
+    public void insertConfigChangeNotification() throws Exception {
+        zkClient.create()
+                .creatingParentsIfNeeded()
+                .withMode(CreateMode.PERSISTENT_SEQUENTIAL)
+                .forPath(
+                        
ZkData.ConfigEntityChangeNotificationSequenceZNode.pathPrefix(),
+                        
ZkData.ConfigEntityChangeNotificationSequenceZNode.encode());
+    }
+
     // 
--------------------------------------------------------------------------------------------
     // Utils
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ConfigEntityChangeNotificationJsonSerde.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ConfigEntityChangeNotificationJsonSerde.java
new file mode 100644
index 000000000..1a011c69d
--- /dev/null
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ConfigEntityChangeNotificationJsonSerde.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.zk.data;
+
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import com.alibaba.fluss.utils.json.JsonDeserializer;
+import com.alibaba.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+
+/** Json serializer and deserializer for config entity change notification. */
+public class ConfigEntityChangeNotificationJsonSerde
+        implements JsonSerializer<String>, JsonDeserializer<String> {
+
+    public static final ConfigEntityChangeNotificationJsonSerde INSTANCE =
+            new ConfigEntityChangeNotificationJsonSerde();
+
+    private static final String VERSION_KEY = "version";
+    private static final String ENTITY_PATH = "entity_path";
+    private static final int VERSION = 1;
+
+    @Override
+    public void serialize(String entityPath, JsonGenerator generator) throws 
IOException {
+        generator.writeStartObject();
+        generator.writeNumberField(VERSION_KEY, VERSION);
+        generator.writeStringField(ENTITY_PATH, entityPath);
+        generator.writeEndObject();
+    }
+
+    @Override
+    public String deserialize(JsonNode node) {
+        return node.get(ENTITY_PATH).asText();
+    }
+}
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ConfigJsonSerde.java
 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ConfigJsonSerde.java
new file mode 100644
index 000000000..28b2736d8
--- /dev/null
+++ 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ConfigJsonSerde.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.zk.data;
+
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
com.alibaba.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import com.alibaba.fluss.utils.json.JsonDeserializer;
+import com.alibaba.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/** Json serializer and deserializer for config properties. */
+public class ConfigJsonSerde
+        implements JsonSerializer<Map<String, String>>, 
JsonDeserializer<Map<String, String>> {
+
+    public static final ConfigJsonSerde INSTANCE = new ConfigJsonSerde();
+
+    private static final String VERSION_KEY = "version";
+    private static final String CONFIG = "config";
+    private static final int VERSION = 1;
+
+    @Override
+    public void serialize(Map<String, String> properties, JsonGenerator 
generator)
+            throws IOException {
+        generator.writeStartObject();
+        generator.writeNumberField(VERSION_KEY, VERSION);
+        generator.writeObjectFieldStart(CONFIG);
+        for (Map.Entry<String, String> property : properties.entrySet()) {
+            if (property.getValue() != null) {
+                generator.writeStringField(property.getKey(), 
property.getValue());
+            } else {
+                generator.writeNullField(property.getKey());
+            }
+        }
+        generator.writeEndObject();
+
+        generator.writeEndObject();
+    }
+
+    @Override
+    public Map<String, String> deserialize(JsonNode node) {
+        Map<String, String> properties = new HashMap<>();
+        JsonNode bucketsNode = node.get(CONFIG);
+        Iterator<Map.Entry<String, JsonNode>> fields = bucketsNode.fields();
+        while (fields.hasNext()) {
+            Map.Entry<String, JsonNode> field = fields.next();
+            properties.put(
+                    field.getKey(), field.getValue().isNull() ? null : 
field.getValue().asText());
+        }
+
+        return properties;
+    }
+}
diff --git 
a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ZkData.java 
b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ZkData.java
index a8259e3cc..5116d610a 100644
--- a/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ZkData.java
+++ b/fluss-server/src/main/java/com/alibaba/fluss/server/zk/data/ZkData.java
@@ -29,6 +29,7 @@ import com.alibaba.fluss.utils.json.JsonSerdeUtils;
 import javax.annotation.Nullable;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Map;
 
 /** The data and path stored in ZooKeeper nodes (znodes). */
 public final class ZkData {
@@ -649,4 +650,72 @@ public final class ZkData {
             }
         }
     }
+
+    /**
+     * The znode for the configs in the system. The znode path is:
+     *
+     * <p>/config
+     */
+    public static final class ConfigZNode {
+        public static String path() {
+            return "/config";
+        }
+    }
+
+    /**
+     * The znode for a specific config entity. The znode path is:
+     *
+     * <p>/config/[entityType]/[entityName]
+     */
+    public static final class ConfigEntityZNode {
+        public static final String ENTITY_TYPE = "server";
+        public static final String ENTITY_NAME = "global";
+
+        public static String path() {
+            return ConfigZNode.path() + "/" + ENTITY_TYPE + "/" + ENTITY_NAME;
+        }
+
+        public static byte[] encode(Map<String, String> properties) {
+            return JsonSerdeUtils.writeValueAsBytes(properties, 
ConfigJsonSerde.INSTANCE);
+        }
+
+        public static Map<String, String> decode(byte[] json) {
+            return JsonSerdeUtils.readValue(json, ConfigJsonSerde.INSTANCE);
+        }
+    }
+
+    /**
+     * The znode for tracking config entity changes in the system. This znode 
serves as a root node
+     * for all config entity change notifications. The znode path is:
+     *
+     * <p>/config/changes
+     */
+    public static final class ConfigEntityChangeNotificationZNode {
+        public static String path() {
+            return ConfigZNode.path() + "/changes";
+        }
+    }
+
+    /**
+     * The znode for individual entity changes change notifications. Each 
notification is stored as
+     * a sequential child node under the {@link 
ConfigEntityChangeNotificationZNode} with a prefix.
+     * The znode path follows this structure:
+     *
+     * <p>/config/changes/acl_changes_[sequenceNumber]
+     */
+    public static final class ConfigEntityChangeNotificationSequenceZNode {
+        private static final String SEQUENT_NUMBER_PREFIX = "config_change_";
+
+        public static String pathPrefix() {
+            return ConfigEntityChangeNotificationZNode.path() + "/" + 
SEQUENT_NUMBER_PREFIX;
+        }
+
+        public static String prefix() {
+            return SEQUENT_NUMBER_PREFIX;
+        }
+
+        public static byte[] encode() {
+            return new byte[0];
+        }
+    }
 }
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/DynamicConfigChangeTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/DynamicConfigChangeTest.java
new file mode 100644
index 000000000..72f4aee02
--- /dev/null
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/DynamicConfigChangeTest.java
@@ -0,0 +1,230 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server;
+
+import com.alibaba.fluss.config.ConfigOptions;
+import com.alibaba.fluss.config.Configuration;
+import com.alibaba.fluss.config.dynamic.AlterConfigOp;
+import com.alibaba.fluss.exception.ConfigException;
+import com.alibaba.fluss.server.coordinator.LakeCatalogDynamicLoader;
+import com.alibaba.fluss.server.zk.NOPErrorHandler;
+import com.alibaba.fluss.server.zk.ZooKeeperClient;
+import com.alibaba.fluss.server.zk.ZooKeeperExtension;
+import com.alibaba.fluss.server.zk.ZooKeeperUtils;
+import com.alibaba.fluss.server.zk.data.ZkData;
+import com.alibaba.fluss.testutils.common.AllCallbackWrapper;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static com.alibaba.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static com.alibaba.fluss.metadata.DataLakeFormat.PAIMON;
+import static com.alibaba.fluss.testutils.common.CommonTestUtils.retry;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Test for {@link DynamicConfigManager}. */
+public class DynamicConfigChangeTest {
+
+    @RegisterExtension
+    public static AllCallbackWrapper<ZooKeeperExtension> 
zooKeeperExtensionWrapper =
+            new AllCallbackWrapper<>(new ZooKeeperExtension());
+
+    protected static ZooKeeperClient zookeeperClient;
+
+    @BeforeAll
+    static void beforeAll() {
+        final Configuration configuration = new Configuration();
+        configuration.setString(
+                ConfigOptions.ZOOKEEPER_ADDRESS,
+                
zooKeeperExtensionWrapper.getCustomExtension().getConnectString());
+        zookeeperClient =
+                ZooKeeperUtils.startZookeeperClient(configuration, 
NOPErrorHandler.INSTANCE);
+    }
+
+    @AfterAll
+    static void afterAll() {
+        if (zookeeperClient != null) {
+            zookeeperClient.close();
+        }
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        zookeeperClient.deletePath(ZkData.ConfigEntityZNode.path());
+    }
+
+    @Test
+    void testAlterConfigs() throws Exception {
+        DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(new 
Configuration());
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, dynamicServerConfig, 
true);
+        dynamicConfigManager.startup();
+        try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader =
+                new LakeCatalogDynamicLoader(dynamicServerConfig, null)) {
+            assertThatThrownBy(
+                            () ->
+                                    dynamicConfigManager.alterConfigs(
+                                            Collections.singletonList(
+                                                    new AlterConfigOp(
+                                                            "un_support_key",
+                                                            "value",
+                                                            
AlterConfigOp.OpType.SET))))
+                    .isExactlyInstanceOf(ConfigException.class)
+                    .hasMessageContaining(
+                            "The config key un_support_key is not allowed to 
be changed dynamically.");
+
+            dynamicConfigManager.alterConfigs(
+                    Collections.singletonList(
+                            new AlterConfigOp(
+                                    DATALAKE_FORMAT.key(), "paimon", 
AlterConfigOp.OpType.SET)));
+            
assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(PAIMON);
+            dynamicConfigManager.alterConfigs(
+                    Collections.singletonList(
+                            new AlterConfigOp(
+                                    DATALAKE_FORMAT.key(), null, 
AlterConfigOp.OpType.DELETE)));
+            
assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(null);
+        }
+    }
+
+    @Test
+    void testOverrideConfigs() throws Exception {
+        Configuration configuration = new Configuration();
+        configuration.setString(DATALAKE_FORMAT.key(), "paimon");
+        DynamicServerConfig dynamicServerConfig = new 
DynamicServerConfig(configuration);
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, dynamicServerConfig, 
true);
+        dynamicConfigManager.startup();
+        try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader =
+                new LakeCatalogDynamicLoader(dynamicServerConfig, null)) {
+            
assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(PAIMON);
+            dynamicConfigManager.alterConfigs(
+                    Collections.singletonList(
+                            new AlterConfigOp(
+                                    DATALAKE_FORMAT.key(), null, 
AlterConfigOp.OpType.SET)));
+            
assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(null);
+            dynamicConfigManager.alterConfigs(
+                    Collections.singletonList(
+                            new AlterConfigOp(
+                                    DATALAKE_FORMAT.key(), null, 
AlterConfigOp.OpType.DELETE)));
+            
assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(PAIMON);
+        }
+    }
+
+    @Test
+    void testUnknownLakeHouse() throws Exception {
+        Configuration configuration = new Configuration();
+        DynamicServerConfig dynamicServerConfig = new 
DynamicServerConfig(configuration);
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, dynamicServerConfig, 
true);
+        dynamicConfigManager.startup();
+        try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader =
+                new LakeCatalogDynamicLoader(dynamicServerConfig, null)) {
+            assertThatThrownBy(
+                            () ->
+                                    dynamicConfigManager.alterConfigs(
+                                            Collections.singletonList(
+                                                    new AlterConfigOp(
+                                                            
DATALAKE_FORMAT.key(),
+                                                            "unknown",
+                                                            
AlterConfigOp.OpType.SET))))
+                    .hasMessageContaining(
+                            "Could not parse value 'unknown' for key 
'datalake.format'");
+
+            assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isNull();
+        }
+    }
+
+    @Test
+    void testWrongLakeFormatPrefix() throws Exception {
+        Configuration configuration = new Configuration();
+        DynamicServerConfig dynamicServerConfig = new 
DynamicServerConfig(configuration);
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, dynamicServerConfig, 
true);
+        dynamicConfigManager.startup();
+        try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader =
+                new LakeCatalogDynamicLoader(dynamicServerConfig, null)) {
+            assertThatThrownBy(
+                            () ->
+                                    dynamicConfigManager.alterConfigs(
+                                            Arrays.asList(
+                                                    new AlterConfigOp(
+                                                            
DATALAKE_FORMAT.key(),
+                                                            "paimon",
+                                                            
AlterConfigOp.OpType.SET),
+                                                    new AlterConfigOp(
+                                                            
"datalake.iceberg.metastore",
+                                                            "filesystem",
+                                                            
AlterConfigOp.OpType.SET))))
+                    .hasMessage(
+                            "Invalid configuration for datalake format paimon: 
datalake.iceberg.metastore");
+
+            assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isNull();
+        }
+    }
+
+    @Test
+    void testListenUnMatchedDynamicConfigChanges() throws Exception {
+        DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(new 
Configuration());
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, dynamicServerConfig, 
false);
+        dynamicConfigManager.startup();
+        try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader =
+                new LakeCatalogDynamicLoader(dynamicServerConfig, null)) {
+            
assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(null);
+            Map<String, String> config = new HashMap<>();
+            config.put(DATALAKE_FORMAT.key(), "paimon");
+            config.put("un_support_key", "value");
+            zookeeperClient.upsertServerEntityConfig(config);
+            retry(
+                    Duration.ofMinutes(1),
+                    () ->
+                            
assertThat(lakeCatalogDynamicLoader.getDataLakeFormat())
+                                    .isEqualTo(PAIMON));
+        }
+    }
+
+    @Test
+    void testReStartupContainsNoMatchedDynamicConfig() throws Exception {
+        DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(new 
Configuration());
+        Map<String, String> config = new HashMap<>();
+        config.put(DATALAKE_FORMAT.key(), "paimon");
+        config.put("un_support_key", "value");
+
+        // This often happens when upgrading with different allowed configs.
+        zookeeperClient.upsertServerEntityConfig(config);
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, dynamicServerConfig, 
true);
+        // Startup dynamic manager even is not matched now.
+        dynamicConfigManager.startup();
+
+        try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader =
+                new LakeCatalogDynamicLoader(dynamicServerConfig, null)) {
+            
assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(PAIMON);
+        }
+    }
+}
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
index dc5df678d..423d89efc 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/coordinator/TestCoordinatorGateway.java
@@ -22,6 +22,8 @@ import com.alibaba.fluss.metadata.TableBucket;
 import com.alibaba.fluss.rpc.gateway.CoordinatorGateway;
 import com.alibaba.fluss.rpc.messages.AdjustIsrRequest;
 import com.alibaba.fluss.rpc.messages.AdjustIsrResponse;
+import com.alibaba.fluss.rpc.messages.AlterConfigsRequest;
+import com.alibaba.fluss.rpc.messages.AlterConfigsResponse;
 import com.alibaba.fluss.rpc.messages.ApiVersionsRequest;
 import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
 import com.alibaba.fluss.rpc.messages.CommitKvSnapshotRequest;
@@ -40,6 +42,8 @@ import com.alibaba.fluss.rpc.messages.CreateTableRequest;
 import com.alibaba.fluss.rpc.messages.CreateTableResponse;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsRequest;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsResponse;
 import com.alibaba.fluss.rpc.messages.DropAclsRequest;
 import com.alibaba.fluss.rpc.messages.DropAclsResponse;
 import com.alibaba.fluss.rpc.messages.DropDatabaseRequest;
@@ -311,6 +315,17 @@ public class TestCoordinatorGateway implements 
CoordinatorGateway {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<AlterConfigsResponse> 
alterConfigs(AlterConfigsRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public CompletableFuture<DescribeConfigsResponse> describeConfigs(
+            DescribeConfigsRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
     public void setCurrentLeaderEpoch(TableBucket tableBucket, int 
leaderEpoch) {
         currentLeaderEpoch.put(tableBucket, leaderEpoch);
     }
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java
index 770299a0a..ccb6c4317 100644
--- 
a/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/tablet/TestTabletServerGateway.java
@@ -26,6 +26,8 @@ import com.alibaba.fluss.rpc.messages.ApiVersionsRequest;
 import com.alibaba.fluss.rpc.messages.ApiVersionsResponse;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsRequest;
 import com.alibaba.fluss.rpc.messages.DatabaseExistsResponse;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsRequest;
+import com.alibaba.fluss.rpc.messages.DescribeConfigsResponse;
 import com.alibaba.fluss.rpc.messages.FetchLogRequest;
 import com.alibaba.fluss.rpc.messages.FetchLogResponse;
 import com.alibaba.fluss.rpc.messages.GetDatabaseInfoRequest;
@@ -318,6 +320,12 @@ public class TestTabletServerGateway implements 
TabletServerGateway {
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CompletableFuture<DescribeConfigsResponse> describeConfigs(
+            DescribeConfigsRequest request) {
+        throw new UnsupportedOperationException();
+    }
+
     public int pendingRequestSize() {
         return requests.size();
     }
diff --git 
a/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/ConfigJsonSerdeTest.java
 
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/ConfigJsonSerdeTest.java
new file mode 100644
index 000000000..b20a08bf1
--- /dev/null
+++ 
b/fluss-server/src/test/java/com/alibaba/fluss/server/zk/data/ConfigJsonSerdeTest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.fluss.server.zk.data;
+
+import com.alibaba.fluss.utils.json.JsonSerdeTestBase;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Tests for {@link ConfigJsonSerde}. */
+public class ConfigJsonSerdeTest extends JsonSerdeTestBase<Map<String, 
String>> {
+
+    public ConfigJsonSerdeTest() {
+        super(ConfigJsonSerde.INSTANCE);
+    }
+
+    @Override
+    protected Map<String, String>[] createObjects() {
+        Map<String, String>[] maps = new Map[2];
+        Map<String, String> map = new HashMap<>();
+        map.put("datalake.format", "value1");
+        map.put("key2", null);
+        map.put("key3", "value3");
+        maps[0] = map;
+        maps[1] = new HashMap<>();
+        return maps;
+    }
+
+    @Override
+    protected String[] expectedJsons() {
+        return new String[] {
+            
"{\"version\":1,\"config\":{\"datalake.format\":\"value1\",\"key2\":null,\"key3\":\"value3\"}}",
+            "{\"version\":1,\"config\":{}}"
+        };
+    }
+}
diff --git a/fluss-server/src/test/resources/log4j2-test.properties 
b/fluss-server/src/test/resources/log4j2-test.properties
index 7acf6e146..3db036fea 100644
--- a/fluss-server/src/test/resources/log4j2-test.properties
+++ b/fluss-server/src/test/resources/log4j2-test.properties
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO
 rootLogger.appenderRef.test.ref = TestLogger
 
 appender.testlogger.name = TestLogger

Reply via email to