wuchong commented on code in PR #1567:
URL: https://github.com/apache/fluss/pull/1567#discussion_r2404509310


##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -464,6 +471,65 @@ public DropAclsResult 
dropAcls(Collection<AclBindingFilter> filters) {
         return result;
     }
 
+    @Override
+    public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
+        CompletableFuture<Collection<ConfigEntry>> future = new 
CompletableFuture<>();
+        DescribeConfigsRequest request = new DescribeConfigsRequest();
+        gateway.describeConfigs(request)
+                .whenComplete(
+                        (r, t) -> {
+                            if (t != null) {
+                                future.completeExceptionally(t);
+                            }
+
+                            List<PbDescribeConfigsResponseInfo> responseInfos 
= r.getInfosList();
+                            List<ConfigEntry> configEntries =
+                                    responseInfos.stream()
+                                            .map(
+                                                    responseInfo ->
+                                                            new ConfigEntry(
+                                                                    
responseInfo.getConfigKey(),
+                                                                    
responseInfo.hasConfigValue()
+                                                                            ? 
responseInfo
+                                                                               
     .getConfigValue()
+                                                                            : 
null,
+                                                                    
ConfigEntry.ConfigSource
+                                                                            
.valueOf(
+                                                                               
     responseInfo
+                                                                               
             .getConfigSource())))
+                                            .collect(Collectors.toList());
+                            future.complete(configEntries);
+                        });
+        return future;
+    }
+
+    @Override
+    public CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> 
configs) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+
+        AlterConfigsRequest request = new AlterConfigsRequest();
+        for (AlterConfigOp alterConfigOp : configs) {
+            PbAlterConfigsRequestInfo requestInfo =
+                    request.addInfo()
+                            .setConfigKey(alterConfigOp.key())
+                            .setOpType(alterConfigOp.opType().id());
+            if (alterConfigOp.value() != null) {
+                requestInfo.setConfigValue(alterConfigOp.value());
+            }
+        }
+        gateway.alterConfigs(request)
+                .whenComplete(
+                        (r, t) -> {
+                            if (t != null) {
+                                future.completeExceptionally(t);
+                            }
+
+                            future.complete(null);

Review Comment:
   should be in `else` branch? Otherwise, the `future` will be completed twice?



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -464,6 +471,65 @@ public DropAclsResult 
dropAcls(Collection<AclBindingFilter> filters) {
         return result;
     }
 
+    @Override
+    public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
+        CompletableFuture<Collection<ConfigEntry>> future = new 
CompletableFuture<>();
+        DescribeConfigsRequest request = new DescribeConfigsRequest();
+        gateway.describeConfigs(request)
+                .whenComplete(
+                        (r, t) -> {
+                            if (t != null) {
+                                future.completeExceptionally(t);
+                            }
+
+                            List<PbDescribeConfigsResponseInfo> responseInfos 
= r.getInfosList();
+                            List<ConfigEntry> configEntries =
+                                    responseInfos.stream()
+                                            .map(
+                                                    responseInfo ->
+                                                            new ConfigEntry(
+                                                                    
responseInfo.getConfigKey(),
+                                                                    
responseInfo.hasConfigValue()
+                                                                            ? 
responseInfo
+                                                                               
     .getConfigValue()
+                                                                            : 
null,
+                                                                    
ConfigEntry.ConfigSource
+                                                                            
.valueOf(
+                                                                               
     responseInfo
+                                                                               
             .getConfigSource())))
+                                            .collect(Collectors.toList());
+                            future.complete(configEntries);
+                        });

Review Comment:
   This formatted code is quite hard to understand, please extract this code 
into `ClientRpcMessageUtils#toConfigEntries()` to improve the code readability. 



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -649,4 +650,72 @@ public static Resource decode(byte[] json) {
             }
         }
     }
+
+    /**
+     * The znode for the dynamic configs. The znode path is:
+     *
+     * <p>/config
+     */
+    public static final class ConfigZNode {
+        public static String path() {
+            return "/config";
+        }
+    }
+
+    /**
+     * The znode for a specific config entity. The znode path is:
+     *
+     * <p>/config/[entityType]/[entityName]
+     */
+    public static final class ConfigEntityZNode {
+        public static final String ENTITY_TYPE = "server";
+        public static final String ENTITY_NAME = "global";
+
+        public static String path() {
+            return ConfigZNode.path() + "/" + ENTITY_TYPE + "/" + ENTITY_NAME;
+        }
+
+        public static byte[] encode(Map<String, String> properties) {
+            return JsonSerdeUtils.writeValueAsBytes(properties, 
ConfigJsonSerde.INSTANCE);
+        }
+
+        public static Map<String, String> decode(byte[] json) {
+            return JsonSerdeUtils.readValue(json, ConfigJsonSerde.INSTANCE);
+        }
+    }
+
+    /**
+     * The znode for tracking dynamic config entity changes. This znode serves 
as a root node for
+     * all config entity change notifications. The znode path is:
+     *
+     * <p>/config/changes
+     */
+    public static final class ConfigEntityChangeNotificationZNode {
+        public static String path() {
+            return ConfigZNode.path() + "/changes";
+        }
+    }
+
+    /**
+     * The znode for individual entity changes change notifications. Each 
notification is stored as
+     * a sequential child node under the {@link 
ConfigEntityChangeNotificationZNode} with a prefix.
+     * The znode path follows this structure:
+     *
+     * <p>/config/changes/acl_changes_[sequenceNumber]

Review Comment:
   `acl_changes_` -> `config_changes_`



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -464,6 +471,65 @@ public DropAclsResult 
dropAcls(Collection<AclBindingFilter> filters) {
         return result;
     }
 
+    @Override
+    public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
+        CompletableFuture<Collection<ConfigEntry>> future = new 
CompletableFuture<>();
+        DescribeConfigsRequest request = new DescribeConfigsRequest();
+        gateway.describeConfigs(request)
+                .whenComplete(
+                        (r, t) -> {
+                            if (t != null) {
+                                future.completeExceptionally(t);

Review Comment:
   should return here or add else branch for following code? Otherwise, the 
future will be completed twice?



##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/**
+ * The dynamic configuration for server. If a {@link ServerReconfigurable} 
implementation class
+ * wants to listen for configuration changes, it can register through a 
method. Subsequently, when
+ * {@link DynamicConfigManager} detects changes, it will update the 
configuration items and push
+ * them to these {@link ServerReconfigurable} instances.
+ */
+@Internal
+public class DynamicServerConfig {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicServerConfig.class);
+    private static final Set<String> ALLOWED_CONFIG_KEYS =
+            Collections.singleton(DATALAKE_FORMAT.key());
+    private static final Set<String> ALLOWED_CONFIG_PREFIXES = 
Collections.singleton("datalake.");
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Set<ServerReconfigurable> serverReconfigurableSet = 
ConcurrentHashMap.newKeySet();
+
+    /** The initial configuration items when the server starts from 
server.yaml. */
+    private final Map<String, String> initialConfigMap;
+
+    /** The dynamic configuration items that are added during running(stored 
in zk). */
+    private final Map<String, String> dynamicConfigs = new HashMap<>();
+
+    /**
+     * The current configuration map, which is a combination of initial 
configuration and dynamic.
+     */
+    private final Map<String, String> currentConfigMap;
+
+    /**
+     * The current configuration, which is a combination of initial 
configuration and dynamic
+     * configuration.
+     */
+    private volatile Configuration currentConfig;
+
+    public DynamicServerConfig(Configuration flussConfig) {
+        this.currentConfig = flussConfig;
+        this.initialConfigMap = flussConfig.toMap();
+        this.currentConfigMap = flussConfig.toMap();
+    }
+
+    /** Register a ServerReconfigurable which listens to configuration 
changes. */
+    public void register(ServerReconfigurable serverReconfigurable) {
+        serverReconfigurableSet.add(serverReconfigurable);
+    }
+
+    /** Update the dynamic configuration and apply to registered 
ServerReconfigurables. */
+    public void updateDynamicConfig(Map<String, String> newDynamicConfigs, 
boolean skipErrorConfig)
+            throws Exception {
+        inWriteLock(lock, () -> updateCurrentConfig(newDynamicConfigs, 
skipErrorConfig));
+    }
+
+    public Configuration getCurrentConfig() {
+        return inReadLock(lock, () -> currentConfig);
+    }
+
+    public Map<String, String> getDynamicConfigs() {
+        return inReadLock(lock, () -> new HashMap<>(dynamicConfigs));
+    }
+
+    public Map<String, String> getInitialServerConfigs() {
+        return inReadLock(lock, () -> new HashMap<>(initialConfigMap));
+    }
+
+    public boolean isAllowedConfig(String key) {
+        if (ALLOWED_CONFIG_KEYS.contains(key)) {
+            return true;
+        }
+
+        for (String prefix : ALLOWED_CONFIG_PREFIXES) {
+            if (key.startsWith(prefix)) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private void updateCurrentConfig(Map<String, String> newDynamicConfigs, 
boolean skipErrorConfig)
+            throws Exception {
+        Map<String, String> newProps = new HashMap<>(initialConfigMap);
+        overrideProps(newProps, newDynamicConfigs);
+        Configuration newConfig = Configuration.fromMap(newProps);
+        Configuration oldConfig = currentConfig;
+        Set<ServerReconfigurable> appliedServerReconfigurableSet = new 
HashSet<>();
+        if (!newProps.equals(currentConfigMap)) {
+            serverReconfigurableSet.forEach(
+                    serverReconfigurable -> {
+                        try {
+                            serverReconfigurable.validate(newConfig);
+                        } catch (Exception e) {

Review Comment:
   Should we catch `ConfigException` here? Otherwise, what's the use case of 
introducing `ConfigException`? 



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -902,6 +903,37 @@ public void insertAclChangeNotification(Resource resource) 
throws Exception {
         LOG.info("add acl change notification for resource {}  ", resource);
     }
 
+    public Map<String, String> fetchEntityConfig() throws Exception {
+        String path = ConfigEntityZNode.path();
+        return getOrEmpty(path).map(ConfigEntityZNode::decode).orElse(new 
HashMap<>());
+    }
+
+    public void upsertServerEntityConfig(Map<String, String> configs) throws 
Exception {
+        upsertEntityConfigs(configs);
+    }
+
+    public void upsertEntityConfigs(Map<String, String> configs) throws 
Exception {
+        String path = ConfigEntityZNode.path();
+        if (zkClient.checkExists().forPath(path) != null) {
+            zkClient.setData().forPath(path, 
ConfigEntityZNode.encode(configs));
+        } else {
+            zkClient.create()
+                    .creatingParentsIfNeeded()
+                    .forPath(path, ConfigEntityZNode.encode(configs));
+        }
+
+        insertConfigChangeNotification();
+    }
+
+    public void insertConfigChangeNotification() throws Exception {

Review Comment:
   Add a log for this like `insertAclChangeNotification`? Currently, it seems 
there is no any logs when alter cluster configs. 



##########
fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java:
##########
@@ -886,6 +889,53 @@ tablePath, newPartitionSpec("age", "11"), false)
                 .isInstanceOf(TooManyPartitionsException.class);
     }
 
+    @Test
+    void testDynamicConfigs() throws ExecutionException, InterruptedException {
+        assertThat(
+                        FLUSS_CLUSTER_EXTENSION
+                                .getCoordinatorServer()
+                                .getCoordinatorService()
+                                .getDataLakeFormat())
+                .isEqualTo(PAIMON);
+
+        admin.alterConfigs(
+                        Collections.singletonList(
+                                new AlterConfigOp(
+                                        DATALAKE_FORMAT.key(), null, 
AlterConfigOp.OpType.SET)))
+                .get();
+        assertThat(
+                        FLUSS_CLUSTER_EXTENSION
+                                .getCoordinatorServer()
+                                .getCoordinatorService()
+                                .getDataLakeFormat())
+                .isNull();
+        assertThat(admin.describeConfigs().get())
+                .contains(

Review Comment:
   I think we should also assert there is no duplicated keys in the returned 
configs. Currently, there is 2 config entry for `datalake.format`.  



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigJsonSerde.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/** Json serializer and deserializer for config properties. */
+public class ConfigJsonSerde
+        implements JsonSerializer<Map<String, String>>, 
JsonDeserializer<Map<String, String>> {
+
+    public static final ConfigJsonSerde INSTANCE = new ConfigJsonSerde();
+
+    private static final String VERSION_KEY = "version";
+    private static final String CONFIG = "config";

Review Comment:
   Use `configs`? It contains multiple configs in the serialized map. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.plugin.PluginManager;
+import org.apache.fluss.server.DynamicServerConfig;
+import org.apache.fluss.server.utils.LakeStorageUtils;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static 
org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A dynamic loader for lake catalog. Each time when the datalake format is 
changed, the lake
+ * catalog will be changed.
+ */
+public class LakeCatalogDynamicLoader implements ServerReconfigurable, 
AutoCloseable {
+    // null if the cluster hasn't configured datalake format
+    private @Nullable DataLakeFormat dataLakeFormat;
+    private @Nullable LakeCatalog lakeCatalog;
+    private @Nullable Map<String, String> defaultTableLakeOptions;
+    private Configuration currentConfiguration;
+    private final PluginManager pluginManager;
+    private final boolean isCoordinator;
+
+    public LakeCatalogDynamicLoader(
+            DynamicServerConfig dynamicServerConfig,
+            PluginManager pluginManager,
+            boolean isCoordinator) {
+        Configuration currentConfig = dynamicServerConfig.getCurrentConfig();
+        this.isCoordinator = isCoordinator;
+        this.currentConfiguration = currentConfig;
+        this.dataLakeFormat = 
currentConfig.getOptional(DATALAKE_FORMAT).orElse(null);
+        this.lakeCatalog = createLakeCatalog(currentConfig, pluginManager);
+        this.defaultTableLakeOptions =
+                
LakeStorageUtils.generateDefaultTableLakeOptions(currentConfig);
+        this.pluginManager = pluginManager;
+        checkState(
+                (dataLakeFormat == null) == (lakeCatalog == null),
+                "dataLakeFormat and lakeCatalog must both be null or both 
non-null, but dataLakeFormat is %s, lakeCatalog is %s.",
+                dataLakeFormat,
+                lakeCatalog);
+        dynamicServerConfig.register(this);
+    }
+
+    @Override
+    public void validate(Configuration newConfig) throws ConfigException {
+        DataLakeFormat newDatalakeFormat = null;
+        try {
+            if (newConfig.getOptional(DATALAKE_FORMAT).isPresent()) {
+                newDatalakeFormat = newConfig.get(DATALAKE_FORMAT);
+            } else {
+                newDatalakeFormat = currentConfiguration.get(DATALAKE_FORMAT);
+            }
+
+            if (newDatalakeFormat == null) {
+                return;
+            }
+        } catch (Exception e) {
+            throw new ConfigException(
+                    "Invalid configuration for datalake format "
+                            + newDatalakeFormat

Review Comment:
   `newDatalakeFormat` is always `null` in the exception block. I think we can 
just print all the new configs in the exception message. 



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.plugin.PluginManager;
+import org.apache.fluss.server.DynamicServerConfig;
+import org.apache.fluss.server.utils.LakeStorageUtils;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static 
org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A dynamic loader for lake catalog. Each time when the datalake format is 
changed, the lake
+ * catalog will be changed.
+ */
+public class LakeCatalogDynamicLoader implements ServerReconfigurable, 
AutoCloseable {
+    // null if the cluster hasn't configured datalake format
+    private @Nullable DataLakeFormat dataLakeFormat;
+    private @Nullable LakeCatalog lakeCatalog;
+    private @Nullable Map<String, String> defaultTableLakeOptions;

Review Comment:
   Currently, there is no atomic guarantee to these 3 variables, and this will 
result in in-consistent results. 
   
   Suggest changes:
   1. add a new class `LakeCatalogContainer` move these 3 members into it. 
   2. volatile `LakeCatalogContainer` variable in `LakeCatalogDynamicLoader`. 
   3. provide only access method for the `LakeCatalogContainer` in the 
`LakeCatalogDynamicLoader`
   4. All externals want to access any of `dataLakeFormat`, `lakeCatalog`, 
`defaultTableLakeOptions`, should access `LakeCatalogContainer` first. 



##########
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java:
##########
@@ -466,6 +473,35 @@ public CompletableFuture<ListAclsResponse> 
listAcls(ListAclsRequest request) {
         }
     }
 
+    @Override
+    public CompletableFuture<DescribeConfigsResponse> describeConfigs(
+            DescribeConfigsRequest request) {
+        if (authorizer != null) {
+            authorizer.authorize(
+                    currentSession(), OperationType.DESCRIBE_CONFIGS, 
Resource.cluster());
+        }
+
+        List<ConfigEntry> configs = dynamicConfigManager.describeConfigs();
+        List<PbDescribeConfigsResponseInfo> pbConfigsInfos =
+                configs.stream()
+                        .map(
+                                configEntry -> {
+                                    PbDescribeConfigsResponseInfo 
pbDescribeConfigsResponseInfo =
+                                            new PbDescribeConfigsResponseInfo()
+                                                    
.setConfigKey(configEntry.key())
+                                                    
.setConfigSource(configEntry.source().name());
+                                    if (configEntry.value() != null) {
+                                        
pbDescribeConfigsResponseInfo.setConfigValue(
+                                                configEntry.value());
+                                    }
+                                    return pbDescribeConfigsResponseInfo;
+                                })
+                        .collect(Collectors.toList());

Review Comment:
   Move the PB conversion to `ServerRpcMessageUtils`



##########
fluss-common/src/main/java/org/apache/fluss/config/dynamic/ConfigEntry.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.config.dynamic;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Configuration entry. */
+public class ConfigEntry {

Review Comment:
   Move `ConfigEntry` and `AlterConfigOp`, `ServerReconfigurable` to 
`org.apache.fluss.config.cluster`. I think it's very clear they are bound to 
dynamic cluster configs. 



##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.dynamic.AlterConfigOp;
+import org.apache.fluss.config.dynamic.ConfigEntry;
+import org.apache.fluss.exception.ConfigException;
+import org.apache.fluss.server.authorizer.ZkNodeChangeNotificationWatcher;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import 
org.apache.fluss.server.zk.data.ZkData.ConfigEntityChangeNotificationSequenceZNode;
+import 
org.apache.fluss.server.zk.data.ZkData.ConfigEntityChangeNotificationZNode;
+import org.apache.fluss.utils.clock.SystemClock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Manager for dynamic configurations. */
+public class DynamicConfigManager {
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicConfigManager.class);
+    private static final long CHANGE_NOTIFICATION_EXPIRATION_MS = 15 * 60 * 
1000L;
+
+    private final DynamicServerConfig dynamicServerConfig;
+    private final ZooKeeperClient zooKeeperClient;
+    private final ZkNodeChangeNotificationWatcher configChangeListener;
+    private final boolean isCoordinator;
+
+    public DynamicConfigManager(
+            ZooKeeperClient zooKeeperClient,
+            DynamicServerConfig dynamicServerConfig,
+            boolean isCoordinator) {
+        this.dynamicServerConfig = dynamicServerConfig;
+        this.zooKeeperClient = zooKeeperClient;
+        this.isCoordinator = isCoordinator;
+        this.configChangeListener =
+                new ZkNodeChangeNotificationWatcher(
+                        zooKeeperClient,
+                        ConfigEntityChangeNotificationZNode.path(),
+                        ConfigEntityChangeNotificationSequenceZNode.prefix(),
+                        CHANGE_NOTIFICATION_EXPIRATION_MS,
+                        new ConfigChangedNotificationHandler(),
+                        SystemClock.getInstance());
+    }
+
+    public void startup() throws Exception {
+        try {
+            configChangeListener.start();
+            Map<String, String> entityConfigs = 
zooKeeperClient.fetchEntityConfig();
+            dynamicServerConfig.updateDynamicConfig(entityConfigs, true);
+        } catch (Exception e) {
+            LOG.error("Failed to update dynamic configs from zookeeper", e);
+        }
+    }
+
+    public void close() {
+        configChangeListener.stop();
+    }
+
+    public List<ConfigEntry> describeConfigs() {
+        Map<String, String> dynamicDefaultConfigs = 
dynamicServerConfig.getDynamicConfigs();
+        Map<String, String> staticServerConfigs = 
dynamicServerConfig.getInitialServerConfigs();
+
+        List<ConfigEntry> configEntries = new ArrayList<>();
+        staticServerConfigs.forEach(
+                (key, value) -> {
+                    ConfigEntry configEntry =
+                            new ConfigEntry(
+                                    key, value, 
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG);
+                    configEntries.add(configEntry);
+                });
+        dynamicDefaultConfigs.forEach(
+                (key, value) -> {
+                    ConfigEntry configEntry =
+                            new ConfigEntry(
+                                    key, value, 
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG);
+                    configEntries.add(configEntry);
+                });

Review Comment:
   we should merge the same config from static and dynamic, and show the source



##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -452,4 +454,19 @@ ListOffsetsResult listOffsets(
      * @return A CompletableFuture indicating completion of the operation.
      */
     DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
+
+    /**
+     * Describe the configs of the cluster.
+     *
+     * @return A CompletableFuture containing the configs of the cluster.
+     */
+    CompletableFuture<Collection<ConfigEntry>> describeConfigs();
+
+    /**
+     * Alter the configs of the cluster.
+     *
+     * @param configs List of configs to alter.
+     * @return A CompletableFuture indicating completion of the operation.
+     */
+    CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs);

Review Comment:
   Considering we just introduced `alterTable` in the recent, I want the 
dynamic config can be easily distinguish with the alterTable configs. 
Currently, it's not clear whose config is going to alter under this API. 
Besides, I hope we can reuse as much as entities with the alter table PR. 
Therefore, I suggest the following API changes: 
   
   ```
   # Admin
   CompletableFuture<Collection<ConfigEntry>> describeConfigs();
   ==>
   CompletableFuture<Collection<ConfigEntry>> describeClusterConfigs();
   
   CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs);
   ==>
   CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);
   
   AlterConfigOp => AlterConfig and use existing `AlterConfigOpType` for the 
AlterConfigOp.OpType
   
   # RPC
   
   repeated PbAlterConfigsRequestInfo infos ==> repeated PbAlterConfig 
alter_configs
   repeated PbDescribeConfigsResponseInfo infos ==>  repeated PbDescribeConfig 
configs
   DescribeConfigsRequest => DescribeClusterConfigsRequest
   AlterConfigsRequest => AlterClusterConfigsRequest
   
   ApiKeys.DESCRIBE_CONFIGS => DESCRIBE_CLUSTER_CONFIGS
   ApiKeys.ALTER_CONFIGS => ALTER_CLUSTER_CONFIGS
   ```
   



##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.plugin.PluginManager;
+import org.apache.fluss.server.DynamicServerConfig;
+import org.apache.fluss.server.utils.LakeStorageUtils;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static 
org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A dynamic loader for lake catalog. Each time when the datalake format is 
changed, the lake
+ * catalog will be changed.
+ */
+public class LakeCatalogDynamicLoader implements ServerReconfigurable, 
AutoCloseable {
+    // null if the cluster hasn't configured datalake format
+    private @Nullable DataLakeFormat dataLakeFormat;
+    private @Nullable LakeCatalog lakeCatalog;
+    private @Nullable Map<String, String> defaultTableLakeOptions;
+    private Configuration currentConfiguration;
+    private final PluginManager pluginManager;
+    private final boolean isCoordinator;
+
+    public LakeCatalogDynamicLoader(
+            DynamicServerConfig dynamicServerConfig,
+            PluginManager pluginManager,
+            boolean isCoordinator) {
+        Configuration currentConfig = dynamicServerConfig.getCurrentConfig();
+        this.isCoordinator = isCoordinator;
+        this.currentConfiguration = currentConfig;
+        this.dataLakeFormat = 
currentConfig.getOptional(DATALAKE_FORMAT).orElse(null);
+        this.lakeCatalog = createLakeCatalog(currentConfig, pluginManager);
+        this.defaultTableLakeOptions =
+                
LakeStorageUtils.generateDefaultTableLakeOptions(currentConfig);
+        this.pluginManager = pluginManager;
+        checkState(
+                (dataLakeFormat == null) == (lakeCatalog == null),
+                "dataLakeFormat and lakeCatalog must both be null or both 
non-null, but dataLakeFormat is %s, lakeCatalog is %s.",
+                dataLakeFormat,
+                lakeCatalog);
+        dynamicServerConfig.register(this);

Review Comment:
   Currently, the `DynamicServerConfig` is only exposed here and this is very 
confusing me, becuase a config has a method of register `ServerReconfigurable`.
   
   It would be better to make `DynamicServerConfig` not public (package 
visible) and only be accessed by `DynamicConfigManager`, just used as an 
internal util class of `DynamicConfigManager`. And we can expose `register()` 
and `getCurrentConfig` from `DynamicConfigManager`. 



##########
fluss-common/src/main/java/org/apache/fluss/security/acl/OperationType.java:
##########
@@ -41,7 +41,9 @@ public enum OperationType {
     CREATE((byte) 5),
     DROP((byte) 6),
     ALTER((byte) 7),
-    DESCRIBE((byte) 8);
+    DESCRIBE((byte) 8),
+    DESCRIBE_CONFIGS((byte) 9),
+    ALTER_CONFIGS((byte) 10);

Review Comment:
   Can we use `DESCRIBE + RESOURCE(cluster)` to replace `DESCRIBE_CONFIGS`, and 
use `ALTER + RESOURCE(cluster)` to replace `ALTER_CONFIGS`? Currently, it seems 
these 2 new operation type are only used for clusters. 



##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigEntityChangeNotificationJsonSerde.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import 
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+
+/** Json serializer and deserializer for config entity change notification. */
+public class ConfigEntityChangeNotificationJsonSerde

Review Comment:
   this is not used.



##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/**
+ * The dynamic configuration for server. If a {@link ServerReconfigurable} 
implementation class
+ * wants to listen for configuration changes, it can register through a 
method. Subsequently, when
+ * {@link DynamicConfigManager} detects changes, it will update the 
configuration items and push
+ * them to these {@link ServerReconfigurable} instances.
+ */
+@Internal
+public class DynamicServerConfig {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicServerConfig.class);
+    private static final Set<String> ALLOWED_CONFIG_KEYS =
+            Collections.singleton(DATALAKE_FORMAT.key());
+    private static final Set<String> ALLOWED_CONFIG_PREFIXES = 
Collections.singleton("datalake.");
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Set<ServerReconfigurable> serverReconfigurableSet = 
ConcurrentHashMap.newKeySet();

Review Comment:
   There is no `hashcode` and `equals` implementation for instances of 
`ServerReconfigurable`, so there will be duplicated `ServerReconfigurable` for 
the same type if register multiple times. Would be better to use `Map<Class, 
Instance>` structure instead.



##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/**
+ * The dynamic configuration for server. If a {@link ServerReconfigurable} 
implementation class
+ * wants to listen for configuration changes, it can register through a 
method. Subsequently, when
+ * {@link DynamicConfigManager} detects changes, it will update the 
configuration items and push
+ * them to these {@link ServerReconfigurable} instances.
+ */
+@Internal
+public class DynamicServerConfig {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DynamicServerConfig.class);
+    private static final Set<String> ALLOWED_CONFIG_KEYS =
+            Collections.singleton(DATALAKE_FORMAT.key());
+    private static final Set<String> ALLOWED_CONFIG_PREFIXES = 
Collections.singleton("datalake.");
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final Set<ServerReconfigurable> serverReconfigurableSet = 
ConcurrentHashMap.newKeySet();
+
+    /** The initial configuration items when the server starts from 
server.yaml. */
+    private final Map<String, String> initialConfigMap;
+
+    /** The dynamic configuration items that are added during running(stored 
in zk). */
+    private final Map<String, String> dynamicConfigs = new HashMap<>();
+
+    /**
+     * The current configuration map, which is a combination of initial 
configuration and dynamic.
+     */
+    private final Map<String, String> currentConfigMap;
+
+    /**
+     * The current configuration, which is a combination of initial 
configuration and dynamic
+     * configuration.
+     */
+    private volatile Configuration currentConfig;

Review Comment:
   no need volatile if all access is protected by lock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to