wuchong commented on code in PR #1567:
URL: https://github.com/apache/fluss/pull/1567#discussion_r2404509310
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -464,6 +471,65 @@ public DropAclsResult
dropAcls(Collection<AclBindingFilter> filters) {
return result;
}
+ @Override
+ public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
+ CompletableFuture<Collection<ConfigEntry>> future = new
CompletableFuture<>();
+ DescribeConfigsRequest request = new DescribeConfigsRequest();
+ gateway.describeConfigs(request)
+ .whenComplete(
+ (r, t) -> {
+ if (t != null) {
+ future.completeExceptionally(t);
+ }
+
+ List<PbDescribeConfigsResponseInfo> responseInfos
= r.getInfosList();
+ List<ConfigEntry> configEntries =
+ responseInfos.stream()
+ .map(
+ responseInfo ->
+ new ConfigEntry(
+
responseInfo.getConfigKey(),
+
responseInfo.hasConfigValue()
+ ?
responseInfo
+
.getConfigValue()
+ :
null,
+
ConfigEntry.ConfigSource
+
.valueOf(
+
responseInfo
+
.getConfigSource())))
+ .collect(Collectors.toList());
+ future.complete(configEntries);
+ });
+ return future;
+ }
+
+ @Override
+ public CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp>
configs) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+
+ AlterConfigsRequest request = new AlterConfigsRequest();
+ for (AlterConfigOp alterConfigOp : configs) {
+ PbAlterConfigsRequestInfo requestInfo =
+ request.addInfo()
+ .setConfigKey(alterConfigOp.key())
+ .setOpType(alterConfigOp.opType().id());
+ if (alterConfigOp.value() != null) {
+ requestInfo.setConfigValue(alterConfigOp.value());
+ }
+ }
+ gateway.alterConfigs(request)
+ .whenComplete(
+ (r, t) -> {
+ if (t != null) {
+ future.completeExceptionally(t);
+ }
+
+ future.complete(null);
Review Comment:
should be in `else` branch? Otherwise, the `future` will be completed twice?
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -464,6 +471,65 @@ public DropAclsResult
dropAcls(Collection<AclBindingFilter> filters) {
return result;
}
+ @Override
+ public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
+ CompletableFuture<Collection<ConfigEntry>> future = new
CompletableFuture<>();
+ DescribeConfigsRequest request = new DescribeConfigsRequest();
+ gateway.describeConfigs(request)
+ .whenComplete(
+ (r, t) -> {
+ if (t != null) {
+ future.completeExceptionally(t);
+ }
+
+ List<PbDescribeConfigsResponseInfo> responseInfos
= r.getInfosList();
+ List<ConfigEntry> configEntries =
+ responseInfos.stream()
+ .map(
+ responseInfo ->
+ new ConfigEntry(
+
responseInfo.getConfigKey(),
+
responseInfo.hasConfigValue()
+ ?
responseInfo
+
.getConfigValue()
+ :
null,
+
ConfigEntry.ConfigSource
+
.valueOf(
+
responseInfo
+
.getConfigSource())))
+ .collect(Collectors.toList());
+ future.complete(configEntries);
+ });
Review Comment:
This formatted code is quite hard to understand, please extract this code
into `ClientRpcMessageUtils#toConfigEntries()` to improve the code readability.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java:
##########
@@ -649,4 +650,72 @@ public static Resource decode(byte[] json) {
}
}
}
+
+ /**
+ * The znode for the dynamic configs. The znode path is:
+ *
+ * <p>/config
+ */
+ public static final class ConfigZNode {
+ public static String path() {
+ return "/config";
+ }
+ }
+
+ /**
+ * The znode for a specific config entity. The znode path is:
+ *
+ * <p>/config/[entityType]/[entityName]
+ */
+ public static final class ConfigEntityZNode {
+ public static final String ENTITY_TYPE = "server";
+ public static final String ENTITY_NAME = "global";
+
+ public static String path() {
+ return ConfigZNode.path() + "/" + ENTITY_TYPE + "/" + ENTITY_NAME;
+ }
+
+ public static byte[] encode(Map<String, String> properties) {
+ return JsonSerdeUtils.writeValueAsBytes(properties,
ConfigJsonSerde.INSTANCE);
+ }
+
+ public static Map<String, String> decode(byte[] json) {
+ return JsonSerdeUtils.readValue(json, ConfigJsonSerde.INSTANCE);
+ }
+ }
+
+ /**
+ * The znode for tracking dynamic config entity changes. This znode serves
as a root node for
+ * all config entity change notifications. The znode path is:
+ *
+ * <p>/config/changes
+ */
+ public static final class ConfigEntityChangeNotificationZNode {
+ public static String path() {
+ return ConfigZNode.path() + "/changes";
+ }
+ }
+
+ /**
+ * The znode for individual entity changes change notifications. Each
notification is stored as
+ * a sequential child node under the {@link
ConfigEntityChangeNotificationZNode} with a prefix.
+ * The znode path follows this structure:
+ *
+ * <p>/config/changes/acl_changes_[sequenceNumber]
Review Comment:
`acl_changes_` -> `config_changes_`
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java:
##########
@@ -464,6 +471,65 @@ public DropAclsResult
dropAcls(Collection<AclBindingFilter> filters) {
return result;
}
+ @Override
+ public CompletableFuture<Collection<ConfigEntry>> describeConfigs() {
+ CompletableFuture<Collection<ConfigEntry>> future = new
CompletableFuture<>();
+ DescribeConfigsRequest request = new DescribeConfigsRequest();
+ gateway.describeConfigs(request)
+ .whenComplete(
+ (r, t) -> {
+ if (t != null) {
+ future.completeExceptionally(t);
Review Comment:
should return here or add else branch for following code? Otherwise, the
future will be completed twice?
##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/**
+ * The dynamic configuration for server. If a {@link ServerReconfigurable}
implementation class
+ * wants to listen for configuration changes, it can register through a
method. Subsequently, when
+ * {@link DynamicConfigManager} detects changes, it will update the
configuration items and push
+ * them to these {@link ServerReconfigurable} instances.
+ */
+@Internal
+public class DynamicServerConfig {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicServerConfig.class);
+ private static final Set<String> ALLOWED_CONFIG_KEYS =
+ Collections.singleton(DATALAKE_FORMAT.key());
+ private static final Set<String> ALLOWED_CONFIG_PREFIXES =
Collections.singleton("datalake.");
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Set<ServerReconfigurable> serverReconfigurableSet =
ConcurrentHashMap.newKeySet();
+
+ /** The initial configuration items when the server starts from
server.yaml. */
+ private final Map<String, String> initialConfigMap;
+
+ /** The dynamic configuration items that are added during running(stored
in zk). */
+ private final Map<String, String> dynamicConfigs = new HashMap<>();
+
+ /**
+ * The current configuration map, which is a combination of initial
configuration and dynamic.
+ */
+ private final Map<String, String> currentConfigMap;
+
+ /**
+ * The current configuration, which is a combination of initial
configuration and dynamic
+ * configuration.
+ */
+ private volatile Configuration currentConfig;
+
+ public DynamicServerConfig(Configuration flussConfig) {
+ this.currentConfig = flussConfig;
+ this.initialConfigMap = flussConfig.toMap();
+ this.currentConfigMap = flussConfig.toMap();
+ }
+
+ /** Register a ServerReconfigurable which listens to configuration
changes. */
+ public void register(ServerReconfigurable serverReconfigurable) {
+ serverReconfigurableSet.add(serverReconfigurable);
+ }
+
+ /** Update the dynamic configuration and apply to registered
ServerReconfigurables. */
+ public void updateDynamicConfig(Map<String, String> newDynamicConfigs,
boolean skipErrorConfig)
+ throws Exception {
+ inWriteLock(lock, () -> updateCurrentConfig(newDynamicConfigs,
skipErrorConfig));
+ }
+
+ public Configuration getCurrentConfig() {
+ return inReadLock(lock, () -> currentConfig);
+ }
+
+ public Map<String, String> getDynamicConfigs() {
+ return inReadLock(lock, () -> new HashMap<>(dynamicConfigs));
+ }
+
+ public Map<String, String> getInitialServerConfigs() {
+ return inReadLock(lock, () -> new HashMap<>(initialConfigMap));
+ }
+
+ public boolean isAllowedConfig(String key) {
+ if (ALLOWED_CONFIG_KEYS.contains(key)) {
+ return true;
+ }
+
+ for (String prefix : ALLOWED_CONFIG_PREFIXES) {
+ if (key.startsWith(prefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private void updateCurrentConfig(Map<String, String> newDynamicConfigs,
boolean skipErrorConfig)
+ throws Exception {
+ Map<String, String> newProps = new HashMap<>(initialConfigMap);
+ overrideProps(newProps, newDynamicConfigs);
+ Configuration newConfig = Configuration.fromMap(newProps);
+ Configuration oldConfig = currentConfig;
+ Set<ServerReconfigurable> appliedServerReconfigurableSet = new
HashSet<>();
+ if (!newProps.equals(currentConfigMap)) {
+ serverReconfigurableSet.forEach(
+ serverReconfigurable -> {
+ try {
+ serverReconfigurable.validate(newConfig);
+ } catch (Exception e) {
Review Comment:
Should we catch `ConfigException` here? Otherwise, what's the use case of
introducing `ConfigException`?
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java:
##########
@@ -902,6 +903,37 @@ public void insertAclChangeNotification(Resource resource)
throws Exception {
LOG.info("add acl change notification for resource {} ", resource);
}
+ public Map<String, String> fetchEntityConfig() throws Exception {
+ String path = ConfigEntityZNode.path();
+ return getOrEmpty(path).map(ConfigEntityZNode::decode).orElse(new
HashMap<>());
+ }
+
+ public void upsertServerEntityConfig(Map<String, String> configs) throws
Exception {
+ upsertEntityConfigs(configs);
+ }
+
+ public void upsertEntityConfigs(Map<String, String> configs) throws
Exception {
+ String path = ConfigEntityZNode.path();
+ if (zkClient.checkExists().forPath(path) != null) {
+ zkClient.setData().forPath(path,
ConfigEntityZNode.encode(configs));
+ } else {
+ zkClient.create()
+ .creatingParentsIfNeeded()
+ .forPath(path, ConfigEntityZNode.encode(configs));
+ }
+
+ insertConfigChangeNotification();
+ }
+
+ public void insertConfigChangeNotification() throws Exception {
Review Comment:
Add a log for this like `insertAclChangeNotification`? Currently, it seems
there is no any logs when alter cluster configs.
##########
fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java:
##########
@@ -886,6 +889,53 @@ tablePath, newPartitionSpec("age", "11"), false)
.isInstanceOf(TooManyPartitionsException.class);
}
+ @Test
+ void testDynamicConfigs() throws ExecutionException, InterruptedException {
+ assertThat(
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getDataLakeFormat())
+ .isEqualTo(PAIMON);
+
+ admin.alterConfigs(
+ Collections.singletonList(
+ new AlterConfigOp(
+ DATALAKE_FORMAT.key(), null,
AlterConfigOp.OpType.SET)))
+ .get();
+ assertThat(
+ FLUSS_CLUSTER_EXTENSION
+ .getCoordinatorServer()
+ .getCoordinatorService()
+ .getDataLakeFormat())
+ .isNull();
+ assertThat(admin.describeConfigs().get())
+ .contains(
Review Comment:
I think we should also assert there is no duplicated keys in the returned
configs. Currently, there is 2 config entry for `datalake.format`.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigJsonSerde.java:
##########
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/** Json serializer and deserializer for config properties. */
+public class ConfigJsonSerde
+ implements JsonSerializer<Map<String, String>>,
JsonDeserializer<Map<String, String>> {
+
+ public static final ConfigJsonSerde INSTANCE = new ConfigJsonSerde();
+
+ private static final String VERSION_KEY = "version";
+ private static final String CONFIG = "config";
Review Comment:
Use `configs`? It contains multiple configs in the serialized map.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.plugin.PluginManager;
+import org.apache.fluss.server.DynamicServerConfig;
+import org.apache.fluss.server.utils.LakeStorageUtils;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static
org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A dynamic loader for lake catalog. Each time when the datalake format is
changed, the lake
+ * catalog will be changed.
+ */
+public class LakeCatalogDynamicLoader implements ServerReconfigurable,
AutoCloseable {
+ // null if the cluster hasn't configured datalake format
+ private @Nullable DataLakeFormat dataLakeFormat;
+ private @Nullable LakeCatalog lakeCatalog;
+ private @Nullable Map<String, String> defaultTableLakeOptions;
+ private Configuration currentConfiguration;
+ private final PluginManager pluginManager;
+ private final boolean isCoordinator;
+
+ public LakeCatalogDynamicLoader(
+ DynamicServerConfig dynamicServerConfig,
+ PluginManager pluginManager,
+ boolean isCoordinator) {
+ Configuration currentConfig = dynamicServerConfig.getCurrentConfig();
+ this.isCoordinator = isCoordinator;
+ this.currentConfiguration = currentConfig;
+ this.dataLakeFormat =
currentConfig.getOptional(DATALAKE_FORMAT).orElse(null);
+ this.lakeCatalog = createLakeCatalog(currentConfig, pluginManager);
+ this.defaultTableLakeOptions =
+
LakeStorageUtils.generateDefaultTableLakeOptions(currentConfig);
+ this.pluginManager = pluginManager;
+ checkState(
+ (dataLakeFormat == null) == (lakeCatalog == null),
+ "dataLakeFormat and lakeCatalog must both be null or both
non-null, but dataLakeFormat is %s, lakeCatalog is %s.",
+ dataLakeFormat,
+ lakeCatalog);
+ dynamicServerConfig.register(this);
+ }
+
+ @Override
+ public void validate(Configuration newConfig) throws ConfigException {
+ DataLakeFormat newDatalakeFormat = null;
+ try {
+ if (newConfig.getOptional(DATALAKE_FORMAT).isPresent()) {
+ newDatalakeFormat = newConfig.get(DATALAKE_FORMAT);
+ } else {
+ newDatalakeFormat = currentConfiguration.get(DATALAKE_FORMAT);
+ }
+
+ if (newDatalakeFormat == null) {
+ return;
+ }
+ } catch (Exception e) {
+ throw new ConfigException(
+ "Invalid configuration for datalake format "
+ + newDatalakeFormat
Review Comment:
`newDatalakeFormat` is always `null` in the exception block. I think we can
just print all the new configs in the exception message.
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.plugin.PluginManager;
+import org.apache.fluss.server.DynamicServerConfig;
+import org.apache.fluss.server.utils.LakeStorageUtils;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static
org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A dynamic loader for lake catalog. Each time when the datalake format is
changed, the lake
+ * catalog will be changed.
+ */
+public class LakeCatalogDynamicLoader implements ServerReconfigurable,
AutoCloseable {
+ // null if the cluster hasn't configured datalake format
+ private @Nullable DataLakeFormat dataLakeFormat;
+ private @Nullable LakeCatalog lakeCatalog;
+ private @Nullable Map<String, String> defaultTableLakeOptions;
Review Comment:
Currently, there is no atomic guarantee to these 3 variables, and this will
result in in-consistent results.
Suggest changes:
1. add a new class `LakeCatalogContainer` move these 3 members into it.
2. volatile `LakeCatalogContainer` variable in `LakeCatalogDynamicLoader`.
3. provide only access method for the `LakeCatalogContainer` in the
`LakeCatalogDynamicLoader`
4. All externals want to access any of `dataLakeFormat`, `lakeCatalog`,
`defaultTableLakeOptions`, should access `LakeCatalogContainer` first.
##########
fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java:
##########
@@ -466,6 +473,35 @@ public CompletableFuture<ListAclsResponse>
listAcls(ListAclsRequest request) {
}
}
+ @Override
+ public CompletableFuture<DescribeConfigsResponse> describeConfigs(
+ DescribeConfigsRequest request) {
+ if (authorizer != null) {
+ authorizer.authorize(
+ currentSession(), OperationType.DESCRIBE_CONFIGS,
Resource.cluster());
+ }
+
+ List<ConfigEntry> configs = dynamicConfigManager.describeConfigs();
+ List<PbDescribeConfigsResponseInfo> pbConfigsInfos =
+ configs.stream()
+ .map(
+ configEntry -> {
+ PbDescribeConfigsResponseInfo
pbDescribeConfigsResponseInfo =
+ new PbDescribeConfigsResponseInfo()
+
.setConfigKey(configEntry.key())
+
.setConfigSource(configEntry.source().name());
+ if (configEntry.value() != null) {
+
pbDescribeConfigsResponseInfo.setConfigValue(
+ configEntry.value());
+ }
+ return pbDescribeConfigsResponseInfo;
+ })
+ .collect(Collectors.toList());
Review Comment:
Move the PB conversion to `ServerRpcMessageUtils`
##########
fluss-common/src/main/java/org/apache/fluss/config/dynamic/ConfigEntry.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.config.dynamic;
+
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/** Configuration entry. */
+public class ConfigEntry {
Review Comment:
Move `ConfigEntry` and `AlterConfigOp`, `ServerReconfigurable` to
`org.apache.fluss.config.cluster`. I think it's very clear they are bound to
dynamic cluster configs.
##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java:
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.config.dynamic.AlterConfigOp;
+import org.apache.fluss.config.dynamic.ConfigEntry;
+import org.apache.fluss.exception.ConfigException;
+import org.apache.fluss.server.authorizer.ZkNodeChangeNotificationWatcher;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+import
org.apache.fluss.server.zk.data.ZkData.ConfigEntityChangeNotificationSequenceZNode;
+import
org.apache.fluss.server.zk.data.ZkData.ConfigEntityChangeNotificationZNode;
+import org.apache.fluss.utils.clock.SystemClock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/** Manager for dynamic configurations. */
+public class DynamicConfigManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicConfigManager.class);
+ private static final long CHANGE_NOTIFICATION_EXPIRATION_MS = 15 * 60 *
1000L;
+
+ private final DynamicServerConfig dynamicServerConfig;
+ private final ZooKeeperClient zooKeeperClient;
+ private final ZkNodeChangeNotificationWatcher configChangeListener;
+ private final boolean isCoordinator;
+
+ public DynamicConfigManager(
+ ZooKeeperClient zooKeeperClient,
+ DynamicServerConfig dynamicServerConfig,
+ boolean isCoordinator) {
+ this.dynamicServerConfig = dynamicServerConfig;
+ this.zooKeeperClient = zooKeeperClient;
+ this.isCoordinator = isCoordinator;
+ this.configChangeListener =
+ new ZkNodeChangeNotificationWatcher(
+ zooKeeperClient,
+ ConfigEntityChangeNotificationZNode.path(),
+ ConfigEntityChangeNotificationSequenceZNode.prefix(),
+ CHANGE_NOTIFICATION_EXPIRATION_MS,
+ new ConfigChangedNotificationHandler(),
+ SystemClock.getInstance());
+ }
+
+ public void startup() throws Exception {
+ try {
+ configChangeListener.start();
+ Map<String, String> entityConfigs =
zooKeeperClient.fetchEntityConfig();
+ dynamicServerConfig.updateDynamicConfig(entityConfigs, true);
+ } catch (Exception e) {
+ LOG.error("Failed to update dynamic configs from zookeeper", e);
+ }
+ }
+
+ public void close() {
+ configChangeListener.stop();
+ }
+
+ public List<ConfigEntry> describeConfigs() {
+ Map<String, String> dynamicDefaultConfigs =
dynamicServerConfig.getDynamicConfigs();
+ Map<String, String> staticServerConfigs =
dynamicServerConfig.getInitialServerConfigs();
+
+ List<ConfigEntry> configEntries = new ArrayList<>();
+ staticServerConfigs.forEach(
+ (key, value) -> {
+ ConfigEntry configEntry =
+ new ConfigEntry(
+ key, value,
ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG);
+ configEntries.add(configEntry);
+ });
+ dynamicDefaultConfigs.forEach(
+ (key, value) -> {
+ ConfigEntry configEntry =
+ new ConfigEntry(
+ key, value,
ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG);
+ configEntries.add(configEntry);
+ });
Review Comment:
we should merge the same config from static and dynamic, and show the source
##########
fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java:
##########
@@ -452,4 +454,19 @@ ListOffsetsResult listOffsets(
* @return A CompletableFuture indicating completion of the operation.
*/
DropAclsResult dropAcls(Collection<AclBindingFilter> filters);
+
+ /**
+ * Describe the configs of the cluster.
+ *
+ * @return A CompletableFuture containing the configs of the cluster.
+ */
+ CompletableFuture<Collection<ConfigEntry>> describeConfigs();
+
+ /**
+ * Alter the configs of the cluster.
+ *
+ * @param configs List of configs to alter.
+ * @return A CompletableFuture indicating completion of the operation.
+ */
+ CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs);
Review Comment:
Considering we just introduced `alterTable` in the recent, I want the
dynamic config can be easily distinguish with the alterTable configs.
Currently, it's not clear whose config is going to alter under this API.
Besides, I hope we can reuse as much as entities with the alter table PR.
Therefore, I suggest the following API changes:
```
# Admin
CompletableFuture<Collection<ConfigEntry>> describeConfigs();
==>
CompletableFuture<Collection<ConfigEntry>> describeClusterConfigs();
CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs);
==>
CompletableFuture<Void> alterClusterConfigs(Collection<AlterConfig> configs);
AlterConfigOp => AlterConfig and use existing `AlterConfigOpType` for the
AlterConfigOp.OpType
# RPC
repeated PbAlterConfigsRequestInfo infos ==> repeated PbAlterConfig
alter_configs
repeated PbDescribeConfigsResponseInfo infos ==> repeated PbDescribeConfig
configs
DescribeConfigsRequest => DescribeClusterConfigsRequest
AlterConfigsRequest => AlterClusterConfigsRequest
ApiKeys.DESCRIBE_CONFIGS => DESCRIBE_CLUSTER_CONFIGS
ApiKeys.ALTER_CONFIGS => ALTER_CLUSTER_CONFIGS
```
##########
fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.coordinator;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
+import org.apache.fluss.lake.lakestorage.LakeCatalog;
+import org.apache.fluss.lake.lakestorage.LakeStorage;
+import org.apache.fluss.lake.lakestorage.LakeStoragePlugin;
+import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.plugin.PluginManager;
+import org.apache.fluss.server.DynamicServerConfig;
+import org.apache.fluss.server.utils.LakeStorageUtils;
+import org.apache.fluss.utils.IOUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static
org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties;
+import static org.apache.fluss.utils.Preconditions.checkNotNull;
+import static org.apache.fluss.utils.Preconditions.checkState;
+
+/**
+ * A dynamic loader for lake catalog. Each time when the datalake format is
changed, the lake
+ * catalog will be changed.
+ */
+public class LakeCatalogDynamicLoader implements ServerReconfigurable,
AutoCloseable {
+ // null if the cluster hasn't configured datalake format
+ private @Nullable DataLakeFormat dataLakeFormat;
+ private @Nullable LakeCatalog lakeCatalog;
+ private @Nullable Map<String, String> defaultTableLakeOptions;
+ private Configuration currentConfiguration;
+ private final PluginManager pluginManager;
+ private final boolean isCoordinator;
+
+ public LakeCatalogDynamicLoader(
+ DynamicServerConfig dynamicServerConfig,
+ PluginManager pluginManager,
+ boolean isCoordinator) {
+ Configuration currentConfig = dynamicServerConfig.getCurrentConfig();
+ this.isCoordinator = isCoordinator;
+ this.currentConfiguration = currentConfig;
+ this.dataLakeFormat =
currentConfig.getOptional(DATALAKE_FORMAT).orElse(null);
+ this.lakeCatalog = createLakeCatalog(currentConfig, pluginManager);
+ this.defaultTableLakeOptions =
+
LakeStorageUtils.generateDefaultTableLakeOptions(currentConfig);
+ this.pluginManager = pluginManager;
+ checkState(
+ (dataLakeFormat == null) == (lakeCatalog == null),
+ "dataLakeFormat and lakeCatalog must both be null or both
non-null, but dataLakeFormat is %s, lakeCatalog is %s.",
+ dataLakeFormat,
+ lakeCatalog);
+ dynamicServerConfig.register(this);
Review Comment:
Currently, the `DynamicServerConfig` is only exposed here and this is very
confusing me, becuase a config has a method of register `ServerReconfigurable`.
It would be better to make `DynamicServerConfig` not public (package
visible) and only be accessed by `DynamicConfigManager`, just used as an
internal util class of `DynamicConfigManager`. And we can expose `register()`
and `getCurrentConfig` from `DynamicConfigManager`.
##########
fluss-common/src/main/java/org/apache/fluss/security/acl/OperationType.java:
##########
@@ -41,7 +41,9 @@ public enum OperationType {
CREATE((byte) 5),
DROP((byte) 6),
ALTER((byte) 7),
- DESCRIBE((byte) 8);
+ DESCRIBE((byte) 8),
+ DESCRIBE_CONFIGS((byte) 9),
+ ALTER_CONFIGS((byte) 10);
Review Comment:
Can we use `DESCRIBE + RESOURCE(cluster)` to replace `DESCRIBE_CONFIGS`, and
use `ALTER + RESOURCE(cluster)` to replace `ALTER_CONFIGS`? Currently, it seems
these 2 new operation type are only used for clusters.
##########
fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigEntityChangeNotificationJsonSerde.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server.zk.data;
+
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
+import
org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.fluss.utils.json.JsonDeserializer;
+import org.apache.fluss.utils.json.JsonSerializer;
+
+import java.io.IOException;
+
+/** Json serializer and deserializer for config entity change notification. */
+public class ConfigEntityChangeNotificationJsonSerde
Review Comment:
this is not used.
##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/**
+ * The dynamic configuration for server. If a {@link ServerReconfigurable}
implementation class
+ * wants to listen for configuration changes, it can register through a
method. Subsequently, when
+ * {@link DynamicConfigManager} detects changes, it will update the
configuration items and push
+ * them to these {@link ServerReconfigurable} instances.
+ */
+@Internal
+public class DynamicServerConfig {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicServerConfig.class);
+ private static final Set<String> ALLOWED_CONFIG_KEYS =
+ Collections.singleton(DATALAKE_FORMAT.key());
+ private static final Set<String> ALLOWED_CONFIG_PREFIXES =
Collections.singleton("datalake.");
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Set<ServerReconfigurable> serverReconfigurableSet =
ConcurrentHashMap.newKeySet();
Review Comment:
There is no `hashcode` and `equals` implementation for instances of
`ServerReconfigurable`, so there will be duplicated `ServerReconfigurable` for
the same type if register multiple times. Would be better to use `Map<Class,
Instance>` structure instead.
##########
fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java:
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.server;
+
+import org.apache.fluss.annotation.Internal;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.config.dynamic.ServerReconfigurable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
+import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
+import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
+
+/**
+ * The dynamic configuration for server. If a {@link ServerReconfigurable}
implementation class
+ * wants to listen for configuration changes, it can register through a
method. Subsequently, when
+ * {@link DynamicConfigManager} detects changes, it will update the
configuration items and push
+ * them to these {@link ServerReconfigurable} instances.
+ */
+@Internal
+public class DynamicServerConfig {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DynamicServerConfig.class);
+ private static final Set<String> ALLOWED_CONFIG_KEYS =
+ Collections.singleton(DATALAKE_FORMAT.key());
+ private static final Set<String> ALLOWED_CONFIG_PREFIXES =
Collections.singleton("datalake.");
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ private final Set<ServerReconfigurable> serverReconfigurableSet =
ConcurrentHashMap.newKeySet();
+
+ /** The initial configuration items when the server starts from
server.yaml. */
+ private final Map<String, String> initialConfigMap;
+
+ /** The dynamic configuration items that are added during running(stored
in zk). */
+ private final Map<String, String> dynamicConfigs = new HashMap<>();
+
+ /**
+ * The current configuration map, which is a combination of initial
configuration and dynamic.
+ */
+ private final Map<String, String> currentConfigMap;
+
+ /**
+ * The current configuration, which is a combination of initial
configuration and dynamic
+ * configuration.
+ */
+ private volatile Configuration currentConfig;
Review Comment:
no need volatile if all access is protected by lock
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]