This is an automated email from the ASF dual-hosted git repository. hongshun pushed a commit to branch poc-dymanic-config in repository https://gitbox.apache.org/repos/asf/fluss.git
commit f32db76595ac0d6e2788309b91a9ebe711cbe525 Author: Hongshun Wang <[email protected]> AuthorDate: Thu Aug 28 14:38:05 2025 +0800 [FIP-12] Add dynamic config and enable lakehouse dynamically. --- .../java/org/apache/fluss/client/admin/Admin.java | 17 ++ .../org/apache/fluss/client/admin/FlussAdmin.java | 66 ++++++ .../fluss/client/admin/FlussAdminITCase.java | 58 +++++- .../security/acl/FlussAuthorizationITCase.java | 69 ++++++- .../apache/fluss/config/dynamic/AlterConfigOp.java | 129 ++++++++++++ .../apache/fluss/config/dynamic/ConfigEntry.java | 94 +++++++++ .../fluss/config/dynamic/ServerReconfigurable.java | 62 ++++++ .../apache/fluss/exception/ConfigException.java | 28 +++ .../apache/fluss/security/acl/OperationType.java | 4 +- .../org/apache/fluss/rpc/gateway/AdminGateway.java | 5 + .../fluss/rpc/gateway/AdminReadOnlyGateway.java | 5 + .../fluss/rpc/netty/server/NettyServerHandler.java | 1 - .../org/apache/fluss/rpc/protocol/ApiKeys.java | 4 +- .../java/org/apache/fluss/rpc/protocol/Errors.java | 3 +- fluss-rpc/src/main/proto/FlussApi.proto | 32 ++- .../fluss/rpc/TestingTabletGatewayService.java | 8 + .../fluss/rpc/netty/client/NettyClientTest.java | 2 +- .../apache/fluss/server/DynamicConfigManager.java | 160 ++++++++++++++ .../apache/fluss/server/DynamicServerConfig.java | 180 ++++++++++++++++ .../org/apache/fluss/server/RpcServiceBase.java | 38 +++- .../java/org/apache/fluss/server/ServerBase.java | 2 + .../server/coordinator/CoordinatorServer.java | 49 +++-- .../server/coordinator/CoordinatorService.java | 88 ++++++-- .../coordinator/LakeCatalogDynamicLoader.java | 146 +++++++++++++ .../apache/fluss/server/tablet/TabletServer.java | 17 +- .../apache/fluss/server/tablet/TabletService.java | 12 +- .../apache/fluss/server/zk/ZooKeeperClient.java | 32 +++ .../ConfigEntityChangeNotificationJsonSerde.java | 50 +++++ .../fluss/server/zk/data/ConfigJsonSerde.java | 71 +++++++ .../org/apache/fluss/server/zk/data/ZkData.java | 69 +++++++ .../fluss/server/DynamicConfigChangeTest.java | 230 +++++++++++++++++++++ .../server/authorizer/DefaultAuthorizerTest.java | 40 ++-- .../server/coordinator/TestCoordinatorGateway.java | 15 ++ .../server/tablet/TestTabletServerGateway.java | 8 + .../fluss/server/zk/data/ConfigJsonSerdeTest.java | 51 +++++ fluss-test-coverage/pom.xml | 1 + 36 files changed, 1764 insertions(+), 82 deletions(-) diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java index baaab3507..b15735442 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/Admin.java @@ -23,6 +23,8 @@ import org.apache.fluss.client.metadata.KvSnapshots; import org.apache.fluss.client.metadata.LakeSnapshot; import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.dynamic.AlterConfigOp; +import org.apache.fluss.config.dynamic.ConfigEntry; import org.apache.fluss.exception.DatabaseAlreadyExistException; import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; @@ -452,4 +454,19 @@ public interface Admin extends AutoCloseable { * @return A CompletableFuture indicating completion of the operation. */ DropAclsResult dropAcls(Collection<AclBindingFilter> filters); + + /** + * Describe the configs of the cluster. + * + * @return A CompletableFuture containing the configs of the cluster. + */ + CompletableFuture<Collection<ConfigEntry>> describeConfigs(); + + /** + * Alter the configs of the cluster. + * + * @param configs List of configs to alter. + * @return A CompletableFuture indicating completion of the operation. + */ + CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs); } diff --git a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java index 0f9c007aa..ab72116ba 100644 --- a/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java +++ b/fluss-client/src/main/java/org/apache/fluss/client/admin/FlussAdmin.java @@ -24,6 +24,8 @@ import org.apache.fluss.client.metadata.MetadataUpdater; import org.apache.fluss.client.utils.ClientRpcMessageUtils; import org.apache.fluss.cluster.Cluster; import org.apache.fluss.cluster.ServerNode; +import org.apache.fluss.config.dynamic.AlterConfigOp; +import org.apache.fluss.config.dynamic.ConfigEntry; import org.apache.fluss.exception.LeaderNotAvailableException; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DatabaseInfo; @@ -41,11 +43,13 @@ import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.gateway.AdminGateway; import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.AlterConfigsRequest; import org.apache.fluss.rpc.messages.CreateAclsRequest; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; import org.apache.fluss.rpc.messages.CreateTableRequest; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeConfigsRequest; import org.apache.fluss.rpc.messages.DropAclsRequest; import org.apache.fluss.rpc.messages.DropDatabaseRequest; import org.apache.fluss.rpc.messages.DropTableRequest; @@ -62,6 +66,8 @@ import org.apache.fluss.rpc.messages.ListOffsetsRequest; import org.apache.fluss.rpc.messages.ListPartitionInfosRequest; import org.apache.fluss.rpc.messages.ListTablesRequest; import org.apache.fluss.rpc.messages.ListTablesResponse; +import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo; +import org.apache.fluss.rpc.messages.PbDescribeConfigsResponseInfo; import org.apache.fluss.rpc.messages.PbListOffsetsRespForBucket; import org.apache.fluss.rpc.messages.PbPartitionSpec; import org.apache.fluss.rpc.messages.PbTablePath; @@ -81,6 +87,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeCreatePartitionRequest; import static org.apache.fluss.client.utils.ClientRpcMessageUtils.makeDropPartitionRequest; @@ -464,6 +471,65 @@ public class FlussAdmin implements Admin { return result; } + @Override + public CompletableFuture<Collection<ConfigEntry>> describeConfigs() { + CompletableFuture<Collection<ConfigEntry>> future = new CompletableFuture<>(); + DescribeConfigsRequest request = new DescribeConfigsRequest(); + gateway.describeConfigs(request) + .whenComplete( + (r, t) -> { + if (t != null) { + future.completeExceptionally(t); + } + + List<PbDescribeConfigsResponseInfo> responseInfos = r.getInfosList(); + List<ConfigEntry> configEntries = + responseInfos.stream() + .map( + responseInfo -> + new ConfigEntry( + responseInfo.getConfigKey(), + responseInfo.hasConfigValue() + ? responseInfo + .getConfigValue() + : null, + ConfigEntry.ConfigSource + .valueOf( + responseInfo + .getConfigSource()))) + .collect(Collectors.toList()); + future.complete(configEntries); + }); + return future; + } + + @Override + public CompletableFuture<Void> alterConfigs(Collection<AlterConfigOp> configs) { + CompletableFuture<Void> future = new CompletableFuture<>(); + + AlterConfigsRequest request = new AlterConfigsRequest(); + for (AlterConfigOp alterConfigOp : configs) { + PbAlterConfigsRequestInfo requestInfo = + request.addInfo() + .setConfigKey(alterConfigOp.key()) + .setOpType(alterConfigOp.opType().id()); + if (alterConfigOp.value() != null) { + requestInfo.setConfigValue(alterConfigOp.value()); + } + } + gateway.alterConfigs(request) + .whenComplete( + (r, t) -> { + if (t != null) { + future.completeExceptionally(t); + } + + future.complete(null); + }); + + return future; + } + @Override public void close() { // nothing to do yet diff --git a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java index bf45d5974..7f9ebba46 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/admin/FlussAdminITCase.java @@ -27,6 +27,8 @@ import org.apache.fluss.cluster.ServerNode; import org.apache.fluss.config.AutoPartitionTimeUnit; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.dynamic.AlterConfigOp; +import org.apache.fluss.config.dynamic.ConfigEntry; import org.apache.fluss.exception.DatabaseAlreadyExistException; import org.apache.fluss.exception.DatabaseNotEmptyException; import org.apache.fluss.exception.DatabaseNotExistException; @@ -45,7 +47,6 @@ import org.apache.fluss.exception.TooManyBucketsException; import org.apache.fluss.exception.TooManyPartitionsException; import org.apache.fluss.fs.FsPath; import org.apache.fluss.fs.FsPathAndFileName; -import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.DatabaseInfo; import org.apache.fluss.metadata.KvFormat; @@ -78,6 +79,8 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; import java.util.stream.Stream; +import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; +import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; import static org.apache.fluss.testutils.DataTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -165,7 +168,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase { .isEqualTo( DEFAULT_TABLE_DESCRIPTOR .withReplicationFactor(3) - .withDataLakeFormat(DataLakeFormat.PAIMON)); + .withDataLakeFormat(PAIMON)); assertThat(schemaInfo2).isEqualTo(schemaInfo); assertThat(tableInfo.getCreatedTime()).isEqualTo(tableInfo.getModifiedTime()); assertThat(tableInfo.getCreatedTime()).isLessThan(timestampAfterCreate); @@ -190,7 +193,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase { .isEqualTo( DEFAULT_TABLE_DESCRIPTOR .withReplicationFactor(3) - .withDataLakeFormat(DataLakeFormat.PAIMON)); + .withDataLakeFormat(PAIMON)); assertThat(schemaInfo2).isEqualTo(schemaInfo); // assert created time assertThat(tableInfo.getCreatedTime()) @@ -390,7 +393,7 @@ class FlussAdminITCase extends ClientToServerITCaseBase { .isEqualTo( DEFAULT_TABLE_DESCRIPTOR .withReplicationFactor(3) - .withDataLakeFormat(DataLakeFormat.PAIMON)); + .withDataLakeFormat(PAIMON)); } } @@ -886,6 +889,53 @@ class FlussAdminITCase extends ClientToServerITCaseBase { .isInstanceOf(TooManyPartitionsException.class); } + @Test + void testDynamicConfigs() throws ExecutionException, InterruptedException { + assertThat( + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorService() + .getDataLakeFormat()) + .isEqualTo(PAIMON); + + admin.alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.SET))) + .get(); + assertThat( + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorService() + .getDataLakeFormat()) + .isNull(); + assertThat(admin.describeConfigs().get()) + .contains( + new ConfigEntry( + DATALAKE_FORMAT.key(), + null, + ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG)); + + // Delete dynamic configs to use the initial value(from server.yaml) + admin.alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.DELETE))) + .get(); + assertThat( + FLUSS_CLUSTER_EXTENSION + .getCoordinatorServer() + .getCoordinatorService() + .getDataLakeFormat()) + .isEqualTo(PAIMON); + assertThat(admin.describeConfigs().get()) + .contains( + new ConfigEntry( + DATALAKE_FORMAT.key(), + "paimon", + ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG)); + } + private void assertNoBucketSnapshot(KvSnapshots snapshots, int expectBucketNum) { assertThat(snapshots.getBucketIds()).hasSize(expectBucketNum); for (int i = 0; i < expectBucketNum; i++) { diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 2bfb6f8dc..ab2706233 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -28,6 +28,8 @@ import org.apache.fluss.client.utils.ClientRpcMessageUtils; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.config.MemorySize; +import org.apache.fluss.config.dynamic.AlterConfigOp; +import org.apache.fluss.config.dynamic.ConfigEntry; import org.apache.fluss.exception.AuthorizationException; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DatabaseDescriptor; @@ -69,6 +71,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.ExecutionException; +import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; import static org.apache.fluss.record.TestData.DATA1_SCHEMA; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR; import static org.apache.fluss.record.TestData.DATA1_TABLE_DESCRIPTOR_PK; @@ -653,6 +656,70 @@ public class FlussAuthorizationITCase { } } + @Test + void testDynamicConfigs() throws ExecutionException, InterruptedException { + assertThatThrownBy( + () -> + guestAdmin + .alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), + null, + AlterConfigOp.OpType.SET))) + .get()) + .rootCause() + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate ALTER_CONFIGS on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + rootAdmin + .createAcls( + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.ALTER_CONFIGS, + PermissionType.ALLOW)))) + .all() + .get(); + guestAdmin + .alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.SET))) + .get(); + + assertThatThrownBy(() -> guestAdmin.describeConfigs().get()) + .rootCause() + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate DESCRIBE_CONFIGS on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + rootAdmin + .createAcls( + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.DESCRIBE_CONFIGS, + PermissionType.ALLOW)))) + .all() + .get(); + Collection<ConfigEntry> configToResourceConfigs = guestAdmin.describeConfigs().get(); + assertThat(configToResourceConfigs) + .contains( + new ConfigEntry( + DATALAKE_FORMAT.key(), + null, + ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG)); + } + private static Configuration initConfig() { Configuration conf = new Configuration(); conf.setInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 3); @@ -661,7 +728,7 @@ public class FlussAuthorizationITCase { // set a shorter max lag time to make tests in FlussFailServerTableITCase faster conf.set(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME, Duration.ofSeconds(10)); // set default datalake format for the cluster and enable datalake tables - conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.PAIMON); + conf.set(DATALAKE_FORMAT, DataLakeFormat.PAIMON); conf.set(ConfigOptions.CLIENT_WRITER_BUFFER_MEMORY_SIZE, MemorySize.parse("1mb")); conf.set(ConfigOptions.CLIENT_WRITER_BATCH_SIZE, MemorySize.parse("1kb")); diff --git a/fluss-common/src/main/java/org/apache/fluss/config/dynamic/AlterConfigOp.java b/fluss-common/src/main/java/org/apache/fluss/config/dynamic/AlterConfigOp.java new file mode 100644 index 000000000..970180c0c --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/config/dynamic/AlterConfigOp.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.config.dynamic; + +import javax.annotation.Nullable; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; + +/** Configuration change operation. */ +public class AlterConfigOp { + + /** The type of configuration change operation. */ + public enum OpType { + /** Set the value of the configuration entry. */ + SET((byte) 0), + /** Revert the configuration entry to the default value (possibly null). */ + DELETE((byte) 1), + /** + * (For list-type configuration entries only.) Add the specified values to the current value + * of the configuration entry. If the configuration value has not been set, adds to the + * default value. + */ + APPEND((byte) 2), + /** + * (For list-type configuration entries only.) Removes the specified values from the current + * value of the configuration entry. It is legal to remove values that are not currently in + * the configuration entry. Removing all entries from the current configuration value leaves + * an empty list and does NOT revert to the default value of the entry. + */ + SUBTRACT((byte) 3), + + UNKNOWN((byte) -1); + + private static final Map<Byte, OpType> OP_TYPES = + Collections.unmodifiableMap( + Arrays.stream(values()) + .collect(Collectors.toMap(OpType::id, Function.identity()))); + + private final byte id; + + OpType(final byte id) { + this.id = id; + } + + public byte id() { + return id; + } + + public static OpType forId(final byte id) { + return OP_TYPES.getOrDefault(id, UNKNOWN); + } + } + + private final String key; + @Nullable private final String value; + private final OpType opType; + + public AlterConfigOp(String key, @Nullable String value, OpType operationType) { + this.key = key; + this.value = value; + this.opType = operationType; + } + + public String key() { + return key; + } + + @Nullable + public String value() { + return value; + } + + public OpType opType() { + return opType; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + final AlterConfigOp that = (AlterConfigOp) o; + return opType == that.opType + && Objects.equals(key, that.key) + && Objects.equals(value, that.value); + } + + @Override + public int hashCode() { + return Objects.hash(opType, key, value); + } + + @Override + public String toString() { + return "AlterConfigOp{" + + "name='" + + key + + '\'' + + ", value='" + + value + + '\'' + + ", opType=" + + opType + + '}'; + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/config/dynamic/ConfigEntry.java b/fluss-common/src/main/java/org/apache/fluss/config/dynamic/ConfigEntry.java new file mode 100644 index 000000000..737002dc6 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/config/dynamic/ConfigEntry.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.config.dynamic; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** Configuration entry. */ +public class ConfigEntry { + private final String key; + @Nullable private final String value; + private final ConfigSource source; + + /** + * Create a configuration with the provided values. + * + * @param key the non-null config name + * @param value the config value or null + * @param source the source of this config entr + */ + public ConfigEntry(String key, @Nullable String value, ConfigSource source) { + Objects.requireNonNull(key, "name should not be null"); + this.key = key; + this.value = value; + this.source = source; + } + + /** Return the config key. */ + public String key() { + return key; + } + + /** + * Return the value or null. Null is returned if the config is unset or if isSensitive is true. + */ + public @Nullable String value() { + return value; + } + + /** Return the source of this configuration entry. */ + public ConfigSource source() { + return source; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + ConfigEntry that = (ConfigEntry) o; + + return this.key.equals(that.key) && this.value != null + ? this.value.equals(that.value) + : that.value == null && this.source == that.source; + } + + @Override + public int hashCode() { + return Objects.hash(key, value, source); + } + + @Override + public String toString() { + return "ConfigEntry(" + "name=" + key + ", value=" + value + ", source=" + source + ")"; + } + + /** Source of configuration entries. */ + public enum ConfigSource { + DYNAMIC_SERVER_CONFIG, // dynamic server config that is configured for all servers in the + // cluster + INITIAL_SERVER_CONFIG, // initial server config provided as server properties at start + // up(e.g. server.yaml file) + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/config/dynamic/ServerReconfigurable.java b/fluss-common/src/main/java/org/apache/fluss/config/dynamic/ServerReconfigurable.java new file mode 100644 index 000000000..b5d5e39b4 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/config/dynamic/ServerReconfigurable.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.config.dynamic; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.ConfigException; + +/** Server Reconfigurable Interface which can dynamically respond to configuration changes. */ +public interface ServerReconfigurable { + + /** + * Validates the provided configuration. The provided map contains all configs including any + * reconfigurable configs that may be different from the initial configuration. Reconfiguration + * will be not performed if this method throws any exception. + * + * <p>This method should check that the new configuration values are valid and will not cause + * issues when applied. It should throw a ConfigException with a descriptive message if any + * validation fails. + * + * <p>This validation step is crucial as it prevents applying invalid configurations that could + * potentially cause the component to malfunction or behave unexpectedly. + * + * @param newConfig the new configuration that would be applied if validation succeeds. This + * contains all configuration properties, not just the ones that changed or are + * reconfigurable. + * @throws ConfigException if the configuration is invalid or cannot be applied to this + * component + */ + void validate(Configuration newConfig) throws ConfigException; + + /** + * Reconfigures the component with the provided configuration. + * + * <p>This method is called after validation succeeds and should apply the new configuration to + * the component. The implementation should update internal state and make necessary adjustments + * to adapt to the new configuration values. + * + * <p>This method should be designed to be thread-safe as it may be called concurrently with + * other operations on the component. It should also be prepared to handle being called multiple + * times with different configurations. + * + * @param newConfig the validated configuration to apply to this component + * @throws ConfigException if there is an error applying the configuration, though this should + * generally be avoided as the validation step should catch most issues + */ + void reconfigure(Configuration newConfig) throws ConfigException; +} diff --git a/fluss-common/src/main/java/org/apache/fluss/exception/ConfigException.java b/fluss-common/src/main/java/org/apache/fluss/exception/ConfigException.java new file mode 100644 index 000000000..5378ff463 --- /dev/null +++ b/fluss-common/src/main/java/org/apache/fluss/exception/ConfigException.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.exception; + +/** An exception that indicates a configuration error. */ +public class ConfigException extends ApiException { + + private static final long serialVersionUID = 1L; + + public ConfigException(String message) { + super(message); + } +} diff --git a/fluss-common/src/main/java/org/apache/fluss/security/acl/OperationType.java b/fluss-common/src/main/java/org/apache/fluss/security/acl/OperationType.java index 40fb95c3a..88b1fbd60 100644 --- a/fluss-common/src/main/java/org/apache/fluss/security/acl/OperationType.java +++ b/fluss-common/src/main/java/org/apache/fluss/security/acl/OperationType.java @@ -41,7 +41,9 @@ public enum OperationType { CREATE((byte) 5), DROP((byte) 6), ALTER((byte) 7), - DESCRIBE((byte) 8); + DESCRIBE((byte) 8), + DESCRIBE_CONFIGS((byte) 9), + ALTER_CONFIGS((byte) 10); private final byte code; diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java index b8cbd2512..00990c081 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminGateway.java @@ -17,6 +17,8 @@ package org.apache.fluss.rpc.gateway; +import org.apache.fluss.rpc.messages.AlterConfigsRequest; +import org.apache.fluss.rpc.messages.AlterConfigsResponse; import org.apache.fluss.rpc.messages.CreateAclsRequest; import org.apache.fluss.rpc.messages.CreateAclsResponse; import org.apache.fluss.rpc.messages.CreateDatabaseRequest; @@ -104,6 +106,9 @@ public interface AdminGateway extends AdminReadOnlyGateway { @RPC(api = ApiKeys.DROP_ACLS) CompletableFuture<DropAclsResponse> dropAcls(DropAclsRequest request); + @RPC(api = ApiKeys.ALTER_CONFIGS) + CompletableFuture<AlterConfigsResponse> alterConfigs(AlterConfigsRequest request); + // todo: rename table & alter table } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java index 958654caa..da42b3819 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/gateway/AdminReadOnlyGateway.java @@ -20,6 +20,8 @@ package org.apache.fluss.rpc.gateway; import org.apache.fluss.rpc.RpcGateway; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeConfigsRequest; +import org.apache.fluss.rpc.messages.DescribeConfigsResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; import org.apache.fluss.rpc.messages.GetDatabaseInfoResponse; import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; @@ -187,4 +189,7 @@ public interface AdminReadOnlyGateway extends RpcGateway { */ @RPC(api = ApiKeys.LIST_ACLS) CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request); + + @RPC(api = ApiKeys.DESCRIBE_CONFIGS) + CompletableFuture<DescribeConfigsResponse> describeConfigs(DescribeConfigsRequest request); } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java index 3080c87e0..d00ee1a8a 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/netty/server/NettyServerHandler.java @@ -170,7 +170,6 @@ public final class NettyServerHandler extends ChannelInboundHandlerAdapter { authenticator.isCompleted() ? ConnectionState.READY : ConnectionState.AUTHENTICATING); - // TODO: connection metrics (count, client tags, receive request avg idle time, etc.) } diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java index 0bc2d494a..a1261008d 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/ApiKeys.java @@ -70,7 +70,9 @@ public enum ApiKeys { CREATE_ACLS(1039, 0, 0, PUBLIC), LIST_ACLS(1040, 0, 0, PUBLIC), DROP_ACLS(1041, 0, 0, PUBLIC), - LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE); + LAKE_TIERING_HEARTBEAT(1042, 0, 0, PRIVATE), + DESCRIBE_CONFIGS(1043, 0, 0, PUBLIC), + ALTER_CONFIGS(1044, 0, 0, PUBLIC); private static final Map<Integer, ApiKeys> ID_TO_TYPE = Arrays.stream(ApiKeys.values()) diff --git a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java index 5edb3ac83..708b590af 100644 --- a/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java +++ b/fluss-rpc/src/main/java/org/apache/fluss/rpc/protocol/Errors.java @@ -214,7 +214,8 @@ public enum Errors { INVALID_SERVER_RACK_INFO_EXCEPTION( 52, "The server rack info is invalid.", InvalidServerRackInfoException::new), LAKE_SNAPSHOT_NOT_EXIST( - 53, "The lake snapshot is not exist.", LakeTableSnapshotNotExistException::new); + 53, "The lake snapshot is not exist.", LakeTableSnapshotNotExistException::new), + ; private static final Logger LOG = LoggerFactory.getLogger(Errors.class); diff --git a/fluss-rpc/src/main/proto/FlussApi.proto b/fluss-rpc/src/main/proto/FlussApi.proto index 2ddaa44e8..110b69a3b 100644 --- a/fluss-rpc/src/main/proto/FlussApi.proto +++ b/fluss-rpc/src/main/proto/FlussApi.proto @@ -530,6 +530,23 @@ message LakeTieringHeartbeatResponse { repeated PbHeartbeatRespForTable failed_table_resp = 5; } +message DescribeConfigsRequest{ +} + +message DescribeConfigsResponse{ + repeated PbDescribeConfigsResponseInfo infos = 1; +} + + +message AlterConfigsRequest{ + repeated PbAlterConfigsRequestInfo infos = 1; +} + + +message AlterConfigsResponse{ +} + + // --------------- Inner classes ---------------- message PbApiVersion { @@ -867,4 +884,17 @@ message PbHeartbeatReqForTable { message PbHeartbeatRespForTable { required int64 table_id = 1; optional ErrorResponse error = 2; -} \ No newline at end of file +} + +message PbDescribeConfigsResponseInfo{ + required string config_key = 1; + optional string config_value = 2; + required string config_source = 3; +} + +message PbAlterConfigsRequestInfo{ + required string config_key = 1; + optional string config_value = 2; + required int32 op_type = 3; +} + diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java index 2a60bef37..dc9512e73 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/TestingTabletGatewayService.java @@ -21,6 +21,8 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.rpc.gateway.TabletServerGateway; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeConfigsRequest; +import org.apache.fluss.rpc.messages.DescribeConfigsResponse; import org.apache.fluss.rpc.messages.FetchLogRequest; import org.apache.fluss.rpc.messages.FetchLogResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; @@ -241,4 +243,10 @@ public class TestingTabletGatewayService extends TestingGatewayService public CompletableFuture<ListAclsResponse> listAcls(ListAclsRequest request) { return null; } + + @Override + public CompletableFuture<DescribeConfigsResponse> describeConfigs( + DescribeConfigsRequest request) { + return null; + } } diff --git a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java index 72699f2d3..41caccef0 100644 --- a/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java +++ b/fluss-rpc/src/test/java/org/apache/fluss/rpc/netty/client/NettyClientTest.java @@ -200,7 +200,7 @@ final class NettyClientTest { service, metricGroup, RequestsMetrics.createCoordinatorServerRequestMetrics( - metricGroup)); ) { + metricGroup))) { multipleEndpointsServer.start(); ApiVersionsRequest request = new ApiVersionsRequest() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java new file mode 100644 index 000000000..3ffbb359e --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicConfigManager.java @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server; + +import org.apache.fluss.annotation.VisibleForTesting; +import org.apache.fluss.config.dynamic.AlterConfigOp; +import org.apache.fluss.config.dynamic.ConfigEntry; +import org.apache.fluss.exception.ConfigException; +import org.apache.fluss.server.authorizer.ZkNodeChangeNotificationWatcher; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.data.ZkData.ConfigEntityChangeNotificationSequenceZNode; +import org.apache.fluss.server.zk.data.ZkData.ConfigEntityChangeNotificationZNode; +import org.apache.fluss.utils.clock.SystemClock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +/** Manager for dynamic configurations. */ +public class DynamicConfigManager { + private static final Logger LOG = LoggerFactory.getLogger(DynamicConfigManager.class); + private static final long CHANGE_NOTIFICATION_EXPIRATION_MS = 15 * 60 * 1000L; + + private final DynamicServerConfig dynamicServerConfig; + private final ZooKeeperClient zooKeeperClient; + private final ZkNodeChangeNotificationWatcher configChangeListener; + private final boolean isCoordinator; + + public DynamicConfigManager( + ZooKeeperClient zooKeeperClient, + DynamicServerConfig dynamicServerConfig, + boolean isCoordinator) { + this.dynamicServerConfig = dynamicServerConfig; + this.zooKeeperClient = zooKeeperClient; + this.isCoordinator = isCoordinator; + this.configChangeListener = + new ZkNodeChangeNotificationWatcher( + zooKeeperClient, + ConfigEntityChangeNotificationZNode.path(), + ConfigEntityChangeNotificationSequenceZNode.prefix(), + CHANGE_NOTIFICATION_EXPIRATION_MS, + new ConfigChangedNotificationHandler(), + SystemClock.getInstance()); + } + + public void startup() throws Exception { + try { + configChangeListener.start(); + Map<String, String> entityConfigs = zooKeeperClient.fetchEntityConfig(); + dynamicServerConfig.updateDynamicConfig(entityConfigs, true); + } catch (Exception e) { + LOG.error("Failed to update dynamic configs from zookeeper", e); + } + } + + public void close() { + configChangeListener.stop(); + } + + public List<ConfigEntry> describeConfigs() { + Map<String, String> dynamicDefaultConfigs = dynamicServerConfig.getDynamicConfigs(); + Map<String, String> staticServerConfigs = dynamicServerConfig.getInitialServerConfigs(); + + List<ConfigEntry> configEntries = new ArrayList<>(); + staticServerConfigs.forEach( + (key, value) -> { + ConfigEntry configEntry = + new ConfigEntry( + key, value, ConfigEntry.ConfigSource.INITIAL_SERVER_CONFIG); + configEntries.add(configEntry); + }); + dynamicDefaultConfigs.forEach( + (key, value) -> { + ConfigEntry configEntry = + new ConfigEntry( + key, value, ConfigEntry.ConfigSource.DYNAMIC_SERVER_CONFIG); + configEntries.add(configEntry); + }); + + return configEntries; + } + + public void alterConfigs(List<AlterConfigOp> serverConfigChanges) throws Exception { + Map<String, String> persistentProps = zooKeeperClient.fetchEntityConfig(); + prepareIncrementalConfigs(serverConfigChanges, persistentProps); + alterServerConfigs(persistentProps); + } + + private void prepareIncrementalConfigs( + List<AlterConfigOp> alterConfigOps, Map<String, String> configsProps) { + alterConfigOps.forEach( + alterConfigOp -> { + String configPropName = alterConfigOp.key(); + if (!dynamicServerConfig.isAllowedConfig(configPropName)) { + throw new ConfigException( + String.format( + "The config key %s is not allowed to be changed dynamically.", + configPropName)); + } + + String configPropValue = alterConfigOp.value(); + switch (alterConfigOp.opType()) { + case SET: + configsProps.put(configPropName, configPropValue); + break; + case DELETE: + configsProps.remove(configPropName); + break; + default: + throw new ConfigException( + "Unsupported config operation type " + alterConfigOp.opType()); + } + }); + } + + @VisibleForTesting + protected void alterServerConfigs(Map<String, String> configsProps) throws Exception { + dynamicServerConfig.updateDynamicConfig(configsProps, false); + + // Apply to zookeeper only after verification. + zooKeeperClient.upsertServerEntityConfig(configsProps); + } + + private class ConfigChangedNotificationHandler + implements ZkNodeChangeNotificationWatcher.NotificationHandler { + + @Override + public void processNotification(byte[] notification) throws Exception { + if (isCoordinator) { + return; + } + + if (notification.length != 0) { + throw new ConfigException( + "Config change notification of this version is only empty"); + } + + Map<String, String> entityConfig = zooKeeperClient.fetchEntityConfig(); + dynamicServerConfig.updateDynamicConfig(entityConfig, true); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java new file mode 100644 index 000000000..876b97af4 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server; + +import org.apache.fluss.annotation.Internal; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.dynamic.ServerReconfigurable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; +import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock; +import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock; + +/** + * The dynamic configuration for server. If a {@link ServerReconfigurable} implementation class + * wants to listen for configuration changes, it can register through a method. Subsequently, when + * {@link DynamicConfigManager} detects changes, it will update the configuration items and push + * them to these {@link ServerReconfigurable} instances. + */ +@Internal +public class DynamicServerConfig { + + private static final Logger LOG = LoggerFactory.getLogger(DynamicServerConfig.class); + private static final Set<String> ALLOWED_CONFIG_KEYS = + Collections.singleton(DATALAKE_FORMAT.key()); + private static final Set<String> ALLOWED_CONFIG_PREFIXES = Collections.singleton("datalake."); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Set<ServerReconfigurable> serverReconfigurableSet = ConcurrentHashMap.newKeySet(); + + /** The initial configuration items when the server starts from server.yaml. */ + private final Map<String, String> initialConfigMap; + + /** The dynamic configuration items that are added during running(stored in zk). */ + private final Map<String, String> dynamicConfigs = new HashMap<>(); + + /** + * The current configuration map, which is a combination of initial configuration and dynamic. + */ + private final Map<String, String> currentConfigMap; + + /** + * The current configuration, which is a combination of initial configuration and dynamic + * configuration. + */ + private volatile Configuration currentConfig; + + public DynamicServerConfig(Configuration flussConfig) { + this.currentConfig = flussConfig; + this.initialConfigMap = flussConfig.toMap(); + this.currentConfigMap = flussConfig.toMap(); + } + + /** Register a ServerReconfigurable which listens to configuration changes. */ + public void register(ServerReconfigurable serverReconfigurable) { + serverReconfigurableSet.add(serverReconfigurable); + } + + /** Update the dynamic configuration and apply to registered ServerReconfigurables. */ + public void updateDynamicConfig(Map<String, String> newDynamicConfigs, boolean skipErrorConfig) + throws Exception { + inWriteLock(lock, () -> updateCurrentConfig(newDynamicConfigs, skipErrorConfig)); + } + + public Configuration getCurrentConfig() { + return inReadLock(lock, () -> currentConfig); + } + + public Map<String, String> getDynamicConfigs() { + return inReadLock(lock, () -> new HashMap<>(dynamicConfigs)); + } + + public Map<String, String> getInitialServerConfigs() { + return inReadLock(lock, () -> new HashMap<>(initialConfigMap)); + } + + public boolean isAllowedConfig(String key) { + if (ALLOWED_CONFIG_KEYS.contains(key)) { + return true; + } + + for (String prefix : ALLOWED_CONFIG_PREFIXES) { + if (key.startsWith(prefix)) { + return true; + } + } + return false; + } + + private void updateCurrentConfig(Map<String, String> newDynamicConfigs, boolean skipErrorConfig) + throws Exception { + Map<String, String> newProps = new HashMap<>(initialConfigMap); + overrideProps(newProps, newDynamicConfigs); + Configuration newConfig = Configuration.fromMap(newProps); + Configuration oldConfig = currentConfig; + Set<ServerReconfigurable> appliedServerReconfigurableSet = new HashSet<>(); + if (!newProps.equals(currentConfigMap)) { + serverReconfigurableSet.forEach( + serverReconfigurable -> { + try { + serverReconfigurable.validate(newConfig); + } catch (Exception e) { + LOG.error( + "Validate new dynamic config error and will roll back all the applied config.", + e); + if (!skipErrorConfig) { + throw e; + } + } + }); + + Exception throwable = null; + for (ServerReconfigurable serverReconfigurable : serverReconfigurableSet) { + try { + serverReconfigurable.reconfigure(newConfig); + appliedServerReconfigurableSet.add(serverReconfigurable); + } catch (Exception e) { + LOG.error( + "Apply new dynamic error and will roll back all the applied config.", + e); + if (!skipErrorConfig) { + throwable = e; + break; + } + } + } + + // rollback to old config if there is an error. + if (throwable != null) { + appliedServerReconfigurableSet.forEach( + serverReconfigurable -> serverReconfigurable.reconfigure(oldConfig)); + throw throwable; + } + + currentConfig = newConfig; + currentConfigMap.clear(); + dynamicConfigs.clear(); + currentConfigMap.putAll(newProps); + dynamicConfigs.putAll(newDynamicConfigs); + LOG.info("Dynamic configs changed: {}", newDynamicConfigs); + } + } + + private void overrideProps(Map<String, String> props, Map<String, String> propsOverride) { + propsOverride.forEach( + (key, value) -> { + if (value == null) { + props.remove(key); + } else { + props.put(key, value); + } + }); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java index bbf135bf4..f9c1bc1fa 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/RpcServiceBase.java @@ -18,6 +18,7 @@ package org.apache.fluss.server; import org.apache.fluss.cluster.ServerType; +import org.apache.fluss.config.dynamic.ConfigEntry; import org.apache.fluss.exception.FlussRuntimeException; import org.apache.fluss.exception.KvSnapshotNotExistException; import org.apache.fluss.exception.LakeTableSnapshotNotExistException; @@ -42,6 +43,8 @@ import org.apache.fluss.rpc.messages.ApiVersionsRequest; import org.apache.fluss.rpc.messages.ApiVersionsResponse; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeConfigsRequest; +import org.apache.fluss.rpc.messages.DescribeConfigsResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; import org.apache.fluss.rpc.messages.GetDatabaseInfoResponse; import org.apache.fluss.rpc.messages.GetFileSystemSecurityTokenRequest; @@ -67,6 +70,7 @@ import org.apache.fluss.rpc.messages.ListTablesResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; import org.apache.fluss.rpc.messages.PbApiVersion; +import org.apache.fluss.rpc.messages.PbDescribeConfigsResponseInfo; import org.apache.fluss.rpc.messages.PbTablePath; import org.apache.fluss.rpc.messages.TableExistsRequest; import org.apache.fluss.rpc.messages.TableExistsResponse; @@ -139,6 +143,7 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR protected final ZooKeeperClient zkClient; protected final MetadataManager metadataManager; protected final @Nullable Authorizer authorizer; + protected final DynamicConfigManager dynamicConfigManager; private long tokenLastUpdateTimeMs = 0; private ObtainedSecurityToken securityToken = null; @@ -148,13 +153,15 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR ServerType provider, ZooKeeperClient zkClient, MetadataManager metadataManager, - @Nullable Authorizer authorizer) { + @Nullable Authorizer authorizer, + DynamicConfigManager dynamicConfigManager) { this.remoteFileSystem = remoteFileSystem; this.provider = provider; this.apiManager = new ApiManager(provider); this.zkClient = zkClient; this.metadataManager = metadataManager; this.authorizer = authorizer; + this.dynamicConfigManager = dynamicConfigManager; } @Override @@ -466,6 +473,35 @@ public abstract class RpcServiceBase extends RpcGatewayService implements AdminR } } + @Override + public CompletableFuture<DescribeConfigsResponse> describeConfigs( + DescribeConfigsRequest request) { + if (authorizer != null) { + authorizer.authorize( + currentSession(), OperationType.DESCRIBE_CONFIGS, Resource.cluster()); + } + + List<ConfigEntry> configs = dynamicConfigManager.describeConfigs(); + List<PbDescribeConfigsResponseInfo> pbConfigsInfos = + configs.stream() + .map( + configEntry -> { + PbDescribeConfigsResponseInfo pbDescribeConfigsResponseInfo = + new PbDescribeConfigsResponseInfo() + .setConfigKey(configEntry.key()) + .setConfigSource(configEntry.source().name()); + if (configEntry.value() != null) { + pbDescribeConfigsResponseInfo.setConfigValue( + configEntry.value()); + } + return pbDescribeConfigsResponseInfo; + }) + .collect(Collectors.toList()); + + return CompletableFuture.completedFuture( + new DescribeConfigsResponse().addAllInfos(pbConfigsInfos)); + } + protected MetadataResponse makeMetadataResponse( MetadataRequest request, String listenerName, diff --git a/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java b/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java index 99c1e105f..cd46ca1b5 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/ServerBase.java @@ -69,12 +69,14 @@ public abstract class ServerBase implements AutoCloseableAsync, FatalErrorHandle protected static final long ZOOKEEPER_REGISTER_RETRY_INTERVAL_MS = 3 * 1000L; protected final Configuration conf; + protected final DynamicServerConfig dynamicServerConfig; protected FileSystem remoteFileSystem; protected PluginManager pluginManager; protected ServerBase(Configuration conf) { this.conf = conf; + this.dynamicServerConfig = new DynamicServerConfig(conf); } private Thread shutDownHook; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java index ef393aeb5..e06e1e0d7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java @@ -23,17 +23,13 @@ import org.apache.fluss.cluster.ServerType; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; import org.apache.fluss.exception.IllegalConfigurationException; -import org.apache.fluss.lake.lakestorage.LakeCatalog; -import org.apache.fluss.lake.lakestorage.LakeStorage; -import org.apache.fluss.lake.lakestorage.LakeStoragePlugin; -import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp; -import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metrics.registry.MetricRegistry; import org.apache.fluss.rpc.RpcClient; import org.apache.fluss.rpc.RpcServer; import org.apache.fluss.rpc.metrics.ClientMetricGroup; import org.apache.fluss.rpc.netty.server.RequestsMetrics; +import org.apache.fluss.server.DynamicConfigManager; import org.apache.fluss.server.ServerBase; import org.apache.fluss.server.authorizer.Authorizer; import org.apache.fluss.server.authorizer.AuthorizerLoader; @@ -59,7 +55,6 @@ import javax.annotation.concurrent.GuardedBy; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; @@ -67,9 +62,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; -import static org.apache.fluss.utils.Preconditions.checkNotNull; - /** * Coordinator server implementation. The coordinator server is responsible to: * @@ -142,6 +134,9 @@ public class CoordinatorServer extends ServerBase { @GuardedBy("lock") private CoordinatorContext coordinatorContext; + @GuardedBy("lock") + private DynamicConfigManager dynamicConfigManager; + public CoordinatorServer(Configuration conf) { super(conf); validateConfigs(conf); @@ -173,6 +168,10 @@ public class CoordinatorServer extends ServerBase { this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); + this.dynamicConfigManager = + new DynamicConfigManager(zkClient, dynamicServerConfig, true); + dynamicConfigManager.startup(); + this.coordinatorContext = new CoordinatorContext(); this.metadataCache = new CoordinatorMetadataCache(); @@ -193,8 +192,9 @@ public class CoordinatorServer extends ServerBase { metadataCache, metadataManager, authorizer, - createLakeCatalog(), - lakeTableTieringManager); + new LakeCatalogDynamicLoader(dynamicServerConfig, pluginManager), + lakeTableTieringManager, + dynamicConfigManager); this.rpcServer = RpcServer.create( @@ -247,21 +247,6 @@ public class CoordinatorServer extends ServerBase { } } - @Nullable - private LakeCatalog createLakeCatalog() { - DataLakeFormat dataLakeFormat = conf.get(ConfigOptions.DATALAKE_FORMAT); - if (dataLakeFormat == null) { - return null; - } - LakeStoragePlugin lakeStoragePlugin = - LakeStoragePluginSetUp.fromDataLakeFormat(dataLakeFormat.toString(), pluginManager); - Map<String, String> lakeProperties = extractLakeProperties(conf); - LakeStorage lakeStorage = - lakeStoragePlugin.createLakeStorage( - Configuration.fromMap(checkNotNull(lakeProperties))); - return lakeStorage.createLakeCatalog(); - } - @Override protected CompletableFuture<Result> closeAsync(Result result) { if (isShutDown.compareAndSet(false, true)) { @@ -448,6 +433,14 @@ public class CoordinatorServer extends ServerBase { exception = ExceptionUtils.firstOrSuppressed(t, exception); } + try { + if (dynamicConfigManager != null) { + dynamicConfigManager.close(); + } + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + try { if (rpcClient != null) { rpcClient.close(); @@ -497,6 +490,10 @@ public class CoordinatorServer extends ServerBase { return authorizer; } + public DynamicConfigManager getDynamicConfigManager() { + return dynamicConfigManager; + } + private static void validateConfigs(Configuration conf) { if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) { throw new IllegalConfigurationException( diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index 7253ef962..a5f20bcec 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -17,10 +17,12 @@ package org.apache.fluss.server.coordinator; +import org.apache.fluss.annotation.VisibleForTesting; import org.apache.fluss.cluster.ServerType; import org.apache.fluss.cluster.TabletServerInfo; import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.dynamic.AlterConfigOp; import org.apache.fluss.exception.InvalidCoordinatorException; import org.apache.fluss.exception.InvalidDatabaseException; import org.apache.fluss.exception.InvalidTableException; @@ -28,7 +30,6 @@ import org.apache.fluss.exception.SecurityDisabledException; import org.apache.fluss.exception.TableAlreadyExistException; import org.apache.fluss.exception.TableNotPartitionedException; import org.apache.fluss.fs.FileSystem; -import org.apache.fluss.lake.lakestorage.LakeCatalog; import org.apache.fluss.metadata.DataLakeFormat; import org.apache.fluss.metadata.DatabaseDescriptor; import org.apache.fluss.metadata.PartitionSpec; @@ -42,6 +43,8 @@ import org.apache.fluss.metadata.TablePath; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.AdjustIsrResponse; +import org.apache.fluss.rpc.messages.AlterConfigsRequest; +import org.apache.fluss.rpc.messages.AlterConfigsResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; import org.apache.fluss.rpc.messages.CommitKvSnapshotResponse; import org.apache.fluss.rpc.messages.CommitLakeTableSnapshotRequest; @@ -68,6 +71,7 @@ import org.apache.fluss.rpc.messages.LakeTieringHeartbeatRequest; import org.apache.fluss.rpc.messages.LakeTieringHeartbeatResponse; import org.apache.fluss.rpc.messages.MetadataRequest; import org.apache.fluss.rpc.messages.MetadataResponse; +import org.apache.fluss.rpc.messages.PbAlterConfigsRequestInfo; import org.apache.fluss.rpc.messages.PbHeartbeatReqForTable; import org.apache.fluss.rpc.messages.PbHeartbeatRespForTable; import org.apache.fluss.rpc.netty.server.Session; @@ -76,6 +80,7 @@ import org.apache.fluss.security.acl.AclBinding; import org.apache.fluss.security.acl.AclBindingFilter; import org.apache.fluss.security.acl.OperationType; import org.apache.fluss.security.acl.Resource; +import org.apache.fluss.server.DynamicConfigManager; import org.apache.fluss.server.RpcServiceBase; import org.apache.fluss.server.authorizer.AclCreateResult; import org.apache.fluss.server.authorizer.AclDeleteResult; @@ -113,6 +118,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; @@ -127,7 +133,6 @@ import static org.apache.fluss.server.utils.ServerRpcMessageUtils.toTablePath; import static org.apache.fluss.server.utils.TableAssignmentUtils.generateAssignment; import static org.apache.fluss.utils.PartitionUtils.validatePartitionSpec; import static org.apache.fluss.utils.Preconditions.checkNotNull; -import static org.apache.fluss.utils.Preconditions.checkState; /** An RPC Gateway service for coordinator server. */ public final class CoordinatorService extends RpcServiceBase implements CoordinatorGateway { @@ -138,10 +143,8 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina private final Supplier<Integer> coordinatorEpochSupplier; private final ServerMetadataCache metadataCache; - // null if the cluster hasn't configured datalake format - private final @Nullable DataLakeFormat dataLakeFormat; - private final @Nullable LakeCatalog lakeCatalog; private final LakeTableTieringManager lakeTableTieringManager; + private final LakeCatalogDynamicLoader lakeCatalogDynamicLoader; public CoordinatorService( Configuration conf, @@ -151,24 +154,25 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina ServerMetadataCache metadataCache, MetadataManager metadataManager, @Nullable Authorizer authorizer, - @Nullable LakeCatalog lakeCatalog, - LakeTableTieringManager lakeTableTieringManager) { - super(remoteFileSystem, ServerType.COORDINATOR, zkClient, metadataManager, authorizer); + LakeCatalogDynamicLoader lakeCatalogDynamicLoader, + LakeTableTieringManager lakeTableTieringManager, + DynamicConfigManager dynamicConfigManager) { + super( + remoteFileSystem, + ServerType.COORDINATOR, + zkClient, + metadataManager, + authorizer, + dynamicConfigManager); this.defaultBucketNumber = conf.getInt(ConfigOptions.DEFAULT_BUCKET_NUMBER); this.defaultReplicationFactor = conf.getInt(ConfigOptions.DEFAULT_REPLICATION_FACTOR); this.eventManagerSupplier = () -> coordinatorEventProcessorSupplier.get().getCoordinatorEventManager(); this.coordinatorEpochSupplier = () -> coordinatorEventProcessorSupplier.get().getCoordinatorEpoch(); - this.dataLakeFormat = conf.getOptional(ConfigOptions.DATALAKE_FORMAT).orElse(null); - this.lakeCatalog = lakeCatalog; this.lakeTableTieringManager = lakeTableTieringManager; this.metadataCache = metadataCache; - checkState( - (dataLakeFormat == null) == (lakeCatalog == null), - "dataLakeFormat and lakeCatalog must both be null or both non-null, but dataLakeFormat is %s, lakeCatalog is %s.", - dataLakeFormat, - lakeCatalog); + this.lakeCatalogDynamicLoader = lakeCatalogDynamicLoader; } @Override @@ -178,7 +182,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina @Override public void shutdown() { - IOUtils.closeQuietly(lakeCatalog, "lake catalog"); + IOUtils.closeQuietly(lakeCatalogDynamicLoader, "lake catalog"); } @Override @@ -266,13 +270,16 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina // before create table in fluss, we may create in lake if (isDataLakeEnabled(tableDescriptor)) { try { - checkNotNull(lakeCatalog).createTable(tablePath, tableDescriptor); + checkNotNull(lakeCatalogDynamicLoader.getLakeCatalog()) + .createTable(tablePath, tableDescriptor); } catch (TableAlreadyExistException e) { throw new TableAlreadyExistException( String.format( "The table %s already exists in %s catalog, please " + "first drop the table in %s catalog or use a new table name.", - tablePath, dataLakeFormat, dataLakeFormat)); + tablePath, + lakeCatalogDynamicLoader.getDataLakeFormat(), + lakeCatalogDynamicLoader.getDataLakeFormat())); } } @@ -285,6 +292,7 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina private TableDescriptor applySystemDefaults(TableDescriptor tableDescriptor) { TableDescriptor newDescriptor = tableDescriptor; + DataLakeFormat dataLakeFormat = lakeCatalogDynamicLoader.getDataLakeFormat(); // not set bucket num if (!newDescriptor.getTableDistribution().isPresent() @@ -650,4 +658,48 @@ public final class CoordinatorService extends RpcServiceBase implements Coordina }); return bucketMetadataList; } + + @Override + public CompletableFuture<AlterConfigsResponse> alterConfigs(AlterConfigsRequest request) { + CompletableFuture<AlterConfigsResponse> future = new CompletableFuture<>(); + List<PbAlterConfigsRequestInfo> infos = request.getInfosList(); + if (infos.isEmpty()) { + return CompletableFuture.completedFuture(new AlterConfigsResponse()); + } + + if (authorizer != null) { + authorizer.authorize(currentSession(), OperationType.ALTER_CONFIGS, Resource.cluster()); + } + + List<AlterConfigOp> serverConfigChanges = + infos.stream() + .map( + info -> + new AlterConfigOp( + info.getConfigKey(), + info.hasConfigValue() + ? info.getConfigValue() + : null, + AlterConfigOp.OpType.forId( + (byte) info.getOpType()))) + .collect(Collectors.toList()); + AccessContextEvent<Void> accessContextEvent = + new AccessContextEvent<>( + (context) -> { + try { + dynamicConfigManager.alterConfigs(serverConfigChanges); + future.complete(new AlterConfigsResponse()); + } catch (Exception e) { + future.completeExceptionally(e); + } + return null; + }); + eventManagerSupplier.get().put(accessContextEvent); + return future; + } + + @VisibleForTesting + public DataLakeFormat getDataLakeFormat() { + return lakeCatalogDynamicLoader.getDataLakeFormat(); + } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java new file mode 100644 index 000000000..aee7151a8 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeCatalogDynamicLoader.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.coordinator; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.dynamic.ServerReconfigurable; +import org.apache.fluss.exception.ConfigException; +import org.apache.fluss.lake.lakestorage.LakeCatalog; +import org.apache.fluss.lake.lakestorage.LakeStorage; +import org.apache.fluss.lake.lakestorage.LakeStoragePlugin; +import org.apache.fluss.lake.lakestorage.LakeStoragePluginSetUp; +import org.apache.fluss.metadata.DataLakeFormat; +import org.apache.fluss.plugin.PluginManager; +import org.apache.fluss.server.DynamicServerConfig; +import org.apache.fluss.utils.IOUtils; + +import javax.annotation.Nullable; + +import java.util.Map; + +import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; +import static org.apache.fluss.server.utils.LakeStorageUtils.extractLakeProperties; +import static org.apache.fluss.utils.Preconditions.checkNotNull; +import static org.apache.fluss.utils.Preconditions.checkState; + +/** + * A dynamic loader for lake catalog. Each time when the datalake format is changed, the lake + * catalog will be changed. + */ +public class LakeCatalogDynamicLoader implements ServerReconfigurable, AutoCloseable { + // null if the cluster hasn't configured datalake format + private @Nullable DataLakeFormat dataLakeFormat; + private @Nullable LakeCatalog lakeCatalog; + private Configuration currentConfiguration; + private final PluginManager pluginManager; + + public LakeCatalogDynamicLoader( + DynamicServerConfig dynamicServerConfig, PluginManager pluginManager) { + Configuration currentConfig = dynamicServerConfig.getCurrentConfig(); + this.currentConfiguration = currentConfig; + this.dataLakeFormat = currentConfig.getOptional(DATALAKE_FORMAT).orElse(null); + this.lakeCatalog = createLakeCatalog(currentConfig, pluginManager); + this.pluginManager = pluginManager; + checkState( + (dataLakeFormat == null) == (lakeCatalog == null), + "dataLakeFormat and lakeCatalog must both be null or both non-null, but dataLakeFormat is %s, lakeCatalog is %s.", + dataLakeFormat, + lakeCatalog); + dynamicServerConfig.register(this); + } + + @Override + public void validate(Configuration newConfig) throws ConfigException { + DataLakeFormat newDatalakeFormat = null; + try { + if (newConfig.getOptional(DATALAKE_FORMAT).isPresent()) { + newDatalakeFormat = newConfig.get(DATALAKE_FORMAT); + } else { + newDatalakeFormat = currentConfiguration.get(DATALAKE_FORMAT); + } + + if (newDatalakeFormat == null) { + return; + } + } catch (Exception e) { + throw new ConfigException( + "Invalid configuration for datalake format " + + newDatalakeFormat + + ": " + + e.getMessage()); + } + + Map<String, String> configMap = newConfig.toMap(); + String datalakePrefix = "datalake." + newDatalakeFormat + "."; + DataLakeFormat finalNewDatalakeFormat = newDatalakeFormat; + configMap.forEach( + (key, value) -> { + if (!key.equals(DATALAKE_FORMAT.key()) + && key.startsWith("datalake.") + && !key.startsWith(datalakePrefix)) { + throw new ConfigException( + "Invalid configuration for datalake format " + + finalNewDatalakeFormat + + ": " + + key); + } + }); + } + + @Override + public void reconfigure(Configuration newConfig) throws ConfigException { + DataLakeFormat newLakeFormat = newConfig.getOptional(DATALAKE_FORMAT).orElse(null); + if (newLakeFormat != dataLakeFormat) { + IOUtils.closeQuietly(lakeCatalog, "Lake catalog because config changes"); + this.dataLakeFormat = newLakeFormat; + this.lakeCatalog = createLakeCatalog(newConfig, pluginManager); + this.currentConfiguration = newConfig; + } + } + + @Nullable + private LakeCatalog createLakeCatalog(Configuration conf, PluginManager pluginManager) { + DataLakeFormat dataLakeFormat = conf.get(ConfigOptions.DATALAKE_FORMAT); + if (dataLakeFormat == null) { + return null; + } + LakeStoragePlugin lakeStoragePlugin = + LakeStoragePluginSetUp.fromDataLakeFormat(dataLakeFormat.toString(), pluginManager); + Map<String, String> lakeProperties = extractLakeProperties(conf); + LakeStorage lakeStorage = + lakeStoragePlugin.createLakeStorage( + Configuration.fromMap(checkNotNull(lakeProperties))); + return lakeStorage.createLakeCatalog(); + } + + public @Nullable DataLakeFormat getDataLakeFormat() { + return dataLakeFormat; + } + + public @Nullable LakeCatalog getLakeCatalog() { + return lakeCatalog; + } + + @Override + public void close() throws Exception { + if (lakeCatalog != null) { + lakeCatalog.close(); + } + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java index 9f8db8000..5b612bdef 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java @@ -31,6 +31,7 @@ import org.apache.fluss.rpc.RpcServer; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.metrics.ClientMetricGroup; import org.apache.fluss.rpc.netty.server.RequestsMetrics; +import org.apache.fluss.server.DynamicConfigManager; import org.apache.fluss.server.ServerBase; import org.apache.fluss.server.authorizer.Authorizer; import org.apache.fluss.server.authorizer.AuthorizerLoader; @@ -144,6 +145,9 @@ public class TabletServer extends ServerBase { @Nullable private Authorizer authorizer; + @GuardedBy("lock") + private DynamicConfigManager dynamicConfigManager; + public TabletServer(Configuration conf) { this(conf, SystemClock.getInstance()); } @@ -182,7 +186,6 @@ public class TabletServer extends ServerBase { serverId); this.zkClient = ZooKeeperUtils.startZookeeperClient(conf, this); - MetadataManager metadataManager = new MetadataManager(zkClient, conf); this.metadataCache = new TabletServerMetadataCache(metadataManager, zkClient); @@ -199,6 +202,7 @@ public class TabletServer extends ServerBase { if (authorizer != null) { authorizer.startup(); } + // rpc client to sent request to the tablet server where the leader replica is located // to fetch log. this.clientMetricGroup = @@ -229,6 +233,10 @@ public class TabletServer extends ServerBase { clock); replicaManager.startup(); + this.dynamicConfigManager = + new DynamicConfigManager(zkClient, dynamicServerConfig, false); + dynamicConfigManager.startup(); + this.tabletService = new TabletService( serverId, @@ -237,7 +245,8 @@ public class TabletServer extends ServerBase { replicaManager, metadataCache, metadataManager, - authorizer); + authorizer, + dynamicConfigManager); RequestsMetrics requestsMetrics = RequestsMetrics.createTabletServerRequestMetrics(tabletServerMetricGroup); @@ -396,6 +405,10 @@ public class TabletServer extends ServerBase { authorizer.close(); } + if (dynamicConfigManager != null) { + dynamicConfigManager.close(); + } + } catch (Throwable t) { exception = ExceptionUtils.firstOrSuppressed(t, exception); } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 3bf13aa02..b4bd0419c 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -64,6 +64,7 @@ import org.apache.fluss.rpc.protocol.ApiError; import org.apache.fluss.rpc.protocol.Errors; import org.apache.fluss.security.acl.OperationType; import org.apache.fluss.security.acl.Resource; +import org.apache.fluss.server.DynamicConfigManager; import org.apache.fluss.server.RpcServiceBase; import org.apache.fluss.server.authorizer.Authorizer; import org.apache.fluss.server.coordinator.MetadataManager; @@ -130,8 +131,15 @@ public final class TabletService extends RpcServiceBase implements TabletServerG ReplicaManager replicaManager, TabletServerMetadataCache metadataCache, MetadataManager metadataManager, - @Nullable Authorizer authorizer) { - super(remoteFileSystem, ServerType.TABLET_SERVER, zkClient, metadataManager, authorizer); + @Nullable Authorizer authorizer, + DynamicConfigManager dynamicConfigManager) { + super( + remoteFileSystem, + ServerType.TABLET_SERVER, + zkClient, + metadataManager, + authorizer, + dynamicConfigManager); this.serviceName = "server-" + serverId; this.replicaManager = replicaManager; this.metadataCache = metadataCache; diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java index abdc5e1fd..39904117d 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/ZooKeeperClient.java @@ -46,6 +46,7 @@ import org.apache.fluss.server.zk.data.ZkData.BucketIdsZNode; import org.apache.fluss.server.zk.data.ZkData.BucketRemoteLogsZNode; import org.apache.fluss.server.zk.data.ZkData.BucketSnapshotIdZNode; import org.apache.fluss.server.zk.data.ZkData.BucketSnapshotsZNode; +import org.apache.fluss.server.zk.data.ZkData.ConfigEntityZNode; import org.apache.fluss.server.zk.data.ZkData.CoordinatorZNode; import org.apache.fluss.server.zk.data.ZkData.DatabaseZNode; import org.apache.fluss.server.zk.data.ZkData.DatabasesZNode; @@ -902,6 +903,37 @@ public class ZooKeeperClient implements AutoCloseable { LOG.info("add acl change notification for resource {} ", resource); } + public Map<String, String> fetchEntityConfig() throws Exception { + String path = ConfigEntityZNode.path(); + return getOrEmpty(path).map(ConfigEntityZNode::decode).orElse(new HashMap<>()); + } + + public void upsertServerEntityConfig(Map<String, String> configs) throws Exception { + upsertEntityConfigs(configs); + } + + public void upsertEntityConfigs(Map<String, String> configs) throws Exception { + String path = ConfigEntityZNode.path(); + if (zkClient.checkExists().forPath(path) != null) { + zkClient.setData().forPath(path, ConfigEntityZNode.encode(configs)); + } else { + zkClient.create() + .creatingParentsIfNeeded() + .forPath(path, ConfigEntityZNode.encode(configs)); + } + + insertConfigChangeNotification(); + } + + public void insertConfigChangeNotification() throws Exception { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT_SEQUENTIAL) + .forPath( + ZkData.ConfigEntityChangeNotificationSequenceZNode.pathPrefix(), + ZkData.ConfigEntityChangeNotificationSequenceZNode.encode()); + } + // -------------------------------------------------------------------------------------------- // Utils // -------------------------------------------------------------------------------------------- diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigEntityChangeNotificationJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigEntityChangeNotificationJsonSerde.java new file mode 100644 index 000000000..0d0fbc169 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigEntityChangeNotificationJsonSerde.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; + +/** Json serializer and deserializer for config entity change notification. */ +public class ConfigEntityChangeNotificationJsonSerde + implements JsonSerializer<String>, JsonDeserializer<String> { + + public static final ConfigEntityChangeNotificationJsonSerde INSTANCE = + new ConfigEntityChangeNotificationJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String ENTITY_PATH = "entity_path"; + private static final int VERSION = 1; + + @Override + public void serialize(String entityPath, JsonGenerator generator) throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + generator.writeStringField(ENTITY_PATH, entityPath); + generator.writeEndObject(); + } + + @Override + public String deserialize(JsonNode node) { + return node.get(ENTITY_PATH).asText(); + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigJsonSerde.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigJsonSerde.java new file mode 100644 index 000000000..5d5faf5b8 --- /dev/null +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ConfigJsonSerde.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator; +import org.apache.fluss.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; +import org.apache.fluss.utils.json.JsonDeserializer; +import org.apache.fluss.utils.json.JsonSerializer; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; + +/** Json serializer and deserializer for config properties. */ +public class ConfigJsonSerde + implements JsonSerializer<Map<String, String>>, JsonDeserializer<Map<String, String>> { + + public static final ConfigJsonSerde INSTANCE = new ConfigJsonSerde(); + + private static final String VERSION_KEY = "version"; + private static final String CONFIG = "config"; + private static final int VERSION = 1; + + @Override + public void serialize(Map<String, String> properties, JsonGenerator generator) + throws IOException { + generator.writeStartObject(); + generator.writeNumberField(VERSION_KEY, VERSION); + generator.writeObjectFieldStart(CONFIG); + for (Map.Entry<String, String> property : properties.entrySet()) { + if (property.getValue() != null) { + generator.writeStringField(property.getKey(), property.getValue()); + } else { + generator.writeNullField(property.getKey()); + } + } + generator.writeEndObject(); + + generator.writeEndObject(); + } + + @Override + public Map<String, String> deserialize(JsonNode node) { + Map<String, String> properties = new HashMap<>(); + JsonNode bucketsNode = node.get(CONFIG); + Iterator<Map.Entry<String, JsonNode>> fields = bucketsNode.fields(); + while (fields.hasNext()) { + Map.Entry<String, JsonNode> field = fields.next(); + properties.put( + field.getKey(), field.getValue().isNull() ? null : field.getValue().asText()); + } + + return properties; + } +} diff --git a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java index 9fa256d1c..bb0c7325f 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/zk/data/ZkData.java @@ -29,6 +29,7 @@ import org.apache.fluss.utils.json.JsonSerdeUtils; import javax.annotation.Nullable; import java.nio.charset.StandardCharsets; +import java.util.Map; /** The data and path stored in ZooKeeper nodes (znodes). */ public final class ZkData { @@ -649,4 +650,72 @@ public final class ZkData { } } } + + /** + * The znode for the dynamic configs. The znode path is: + * + * <p>/config + */ + public static final class ConfigZNode { + public static String path() { + return "/config"; + } + } + + /** + * The znode for a specific config entity. The znode path is: + * + * <p>/config/[entityType]/[entityName] + */ + public static final class ConfigEntityZNode { + public static final String ENTITY_TYPE = "server"; + public static final String ENTITY_NAME = "global"; + + public static String path() { + return ConfigZNode.path() + "/" + ENTITY_TYPE + "/" + ENTITY_NAME; + } + + public static byte[] encode(Map<String, String> properties) { + return JsonSerdeUtils.writeValueAsBytes(properties, ConfigJsonSerde.INSTANCE); + } + + public static Map<String, String> decode(byte[] json) { + return JsonSerdeUtils.readValue(json, ConfigJsonSerde.INSTANCE); + } + } + + /** + * The znode for tracking dynamic config entity changes. This znode serves as a root node for + * all config entity change notifications. The znode path is: + * + * <p>/config/changes + */ + public static final class ConfigEntityChangeNotificationZNode { + public static String path() { + return ConfigZNode.path() + "/changes"; + } + } + + /** + * The znode for individual entity changes change notifications. Each notification is stored as + * a sequential child node under the {@link ConfigEntityChangeNotificationZNode} with a prefix. + * The znode path follows this structure: + * + * <p>/config/changes/acl_changes_[sequenceNumber] + */ + public static final class ConfigEntityChangeNotificationSequenceZNode { + private static final String SEQUENT_NUMBER_PREFIX = "config_change_"; + + public static String pathPrefix() { + return ConfigEntityChangeNotificationZNode.path() + "/" + SEQUENT_NUMBER_PREFIX; + } + + public static String prefix() { + return SEQUENT_NUMBER_PREFIX; + } + + public static byte[] encode() { + return new byte[0]; + } + } } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java new file mode 100644 index 000000000..8ea73e935 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server; + +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.Configuration; +import org.apache.fluss.config.dynamic.AlterConfigOp; +import org.apache.fluss.exception.ConfigException; +import org.apache.fluss.server.coordinator.LakeCatalogDynamicLoader; +import org.apache.fluss.server.zk.NOPErrorHandler; +import org.apache.fluss.server.zk.ZooKeeperClient; +import org.apache.fluss.server.zk.ZooKeeperExtension; +import org.apache.fluss.server.zk.ZooKeeperUtils; +import org.apache.fluss.server.zk.data.ZkData.ConfigEntityZNode; +import org.apache.fluss.testutils.common.AllCallbackWrapper; + +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT; +import static org.apache.fluss.metadata.DataLakeFormat.PAIMON; +import static org.apache.fluss.testutils.common.CommonTestUtils.retry; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link DynamicConfigManager}. */ +public class DynamicConfigChangeTest { + + @RegisterExtension + public static AllCallbackWrapper<ZooKeeperExtension> zooKeeperExtensionWrapper = + new AllCallbackWrapper<>(new ZooKeeperExtension()); + + protected static ZooKeeperClient zookeeperClient; + + @BeforeAll + static void beforeAll() { + final Configuration configuration = new Configuration(); + configuration.setString( + ConfigOptions.ZOOKEEPER_ADDRESS, + zooKeeperExtensionWrapper.getCustomExtension().getConnectString()); + zookeeperClient = + ZooKeeperUtils.startZookeeperClient(configuration, NOPErrorHandler.INSTANCE); + } + + @AfterAll + static void afterAll() { + if (zookeeperClient != null) { + zookeeperClient.close(); + } + } + + @AfterEach + void after() throws Exception { + zookeeperClient.deletePath(ConfigEntityZNode.path()); + } + + @Test + void testAlterConfigs() throws Exception { + DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(new Configuration()); + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, dynamicServerConfig, true); + dynamicConfigManager.startup(); + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(dynamicServerConfig, null)) { + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfigOp( + "un_support_key", + "value", + AlterConfigOp.OpType.SET)))) + .isExactlyInstanceOf(ConfigException.class) + .hasMessageContaining( + "The config key un_support_key is not allowed to be changed dynamically."); + + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), "paimon", AlterConfigOp.OpType.SET))); + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(PAIMON); + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.DELETE))); + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(null); + } + } + + @Test + void testOverrideConfigs() throws Exception { + Configuration configuration = new Configuration(); + configuration.setString(DATALAKE_FORMAT.key(), "paimon"); + DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(configuration); + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, dynamicServerConfig, true); + dynamicConfigManager.startup(); + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(dynamicServerConfig, null)) { + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(PAIMON); + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.SET))); + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(null); + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), null, AlterConfigOp.OpType.DELETE))); + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(PAIMON); + } + } + + @Test + void testUnknownLakeHouse() throws Exception { + Configuration configuration = new Configuration(); + DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(configuration); + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, dynamicServerConfig, true); + dynamicConfigManager.startup(); + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(dynamicServerConfig, null)) { + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Collections.singletonList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), + "unknown", + AlterConfigOp.OpType.SET)))) + .hasMessageContaining( + "Could not parse value 'unknown' for key 'datalake.format'"); + + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isNull(); + } + } + + @Test + void testWrongLakeFormatPrefix() throws Exception { + Configuration configuration = new Configuration(); + DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(configuration); + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, dynamicServerConfig, true); + dynamicConfigManager.startup(); + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(dynamicServerConfig, null)) { + assertThatThrownBy( + () -> + dynamicConfigManager.alterConfigs( + Arrays.asList( + new AlterConfigOp( + DATALAKE_FORMAT.key(), + "paimon", + AlterConfigOp.OpType.SET), + new AlterConfigOp( + "datalake.iceberg.metastore", + "filesystem", + AlterConfigOp.OpType.SET)))) + .hasMessage( + "Invalid configuration for datalake format paimon: datalake.iceberg.metastore"); + + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isNull(); + } + } + + @Test + void testListenUnMatchedDynamicConfigChanges() throws Exception { + DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(new Configuration()); + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, dynamicServerConfig, false); + dynamicConfigManager.startup(); + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(dynamicServerConfig, null)) { + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(null); + Map<String, String> config = new HashMap<>(); + config.put(DATALAKE_FORMAT.key(), "paimon"); + config.put("un_support_key", "value"); + zookeeperClient.upsertServerEntityConfig(config); + retry( + Duration.ofMinutes(1), + () -> + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()) + .isEqualTo(PAIMON)); + } + } + + @Test + void testReStartupContainsNoMatchedDynamicConfig() throws Exception { + DynamicServerConfig dynamicServerConfig = new DynamicServerConfig(new Configuration()); + Map<String, String> config = new HashMap<>(); + config.put(DATALAKE_FORMAT.key(), "paimon"); + config.put("un_support_key", "value"); + + // This often happens when upgrading with different allowed configs. + zookeeperClient.upsertServerEntityConfig(config); + DynamicConfigManager dynamicConfigManager = + new DynamicConfigManager(zookeeperClient, dynamicServerConfig, true); + // Startup dynamic manager even is not matched now. + dynamicConfigManager.startup(); + + try (LakeCatalogDynamicLoader lakeCatalogDynamicLoader = + new LakeCatalogDynamicLoader(dynamicServerConfig, null)) { + assertThat(lakeCatalogDynamicLoader.getDataLakeFormat()).isEqualTo(PAIMON); + } + } +} diff --git a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java index 561374926..8f20b621f 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/authorizer/DefaultAuthorizerTest.java @@ -58,7 +58,11 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import static org.apache.fluss.security.acl.OperationType.ALTER; +import static org.apache.fluss.security.acl.OperationType.ALTER_CONFIGS; import static org.apache.fluss.security.acl.OperationType.CREATE; +import static org.apache.fluss.security.acl.OperationType.DESCRIBE; +import static org.apache.fluss.security.acl.OperationType.DESCRIBE_CONFIGS; import static org.apache.fluss.security.acl.OperationType.DROP; import static org.apache.fluss.security.acl.OperationType.READ; import static org.apache.fluss.security.acl.OperationType.WRITE; @@ -281,37 +285,29 @@ public class DefaultAuthorizerTest { Arrays.asList( READ, WRITE, - OperationType.CREATE, - OperationType.DROP, - OperationType.ALTER, - OperationType.DESCRIBE)); + CREATE, + DROP, + ALTER, + DESCRIBE, + ALTER_CONFIGS, + DESCRIBE_CONFIGS)); testOperationTypeImplicationsOfAllow( - Resource.cluster(), - OperationType.CREATE, - Collections.singleton(OperationType.DESCRIBE)); + Resource.cluster(), OperationType.CREATE, Collections.singleton(DESCRIBE)); testOperationTypeImplicationsOfAllow( - Resource.cluster(), - OperationType.DROP, - Collections.singleton(OperationType.DESCRIBE)); + Resource.cluster(), OperationType.DROP, Collections.singleton(DESCRIBE)); testOperationTypeImplicationsOfAllow( - Resource.cluster(), - OperationType.ALTER, - Collections.singleton(OperationType.DESCRIBE)); + Resource.cluster(), ALTER, Collections.singleton(DESCRIBE)); - // when we allow READ on any resource, we also allow DESCRIBE and FILESYSTEM_TOKEN on + // when we allow READ on any resource, we also allow to DESCRIBE and FILESYSTEM_TOKEN on // cluster. testOperationTypeImplicationsOfAllow( - Resource.cluster(), READ, Collections.singletonList(OperationType.DESCRIBE)); + Resource.cluster(), READ, Collections.singletonList(DESCRIBE)); testOperationTypeImplicationsOfAllow( - Resource.cluster(), WRITE, Collections.singletonList(OperationType.DESCRIBE)); + Resource.cluster(), WRITE, Collections.singletonList(DESCRIBE)); testOperationTypeImplicationsOfAllow( - Resource.database("database1"), - READ, - Collections.singletonList(OperationType.DESCRIBE)); + Resource.database("database1"), READ, Collections.singletonList(DESCRIBE)); testOperationTypeImplicationsOfAllow( - Resource.table("database2", "table1"), - WRITE, - Collections.singletonList(OperationType.DESCRIBE)); + Resource.table("database2", "table1"), WRITE, Collections.singletonList(DESCRIBE)); } private void testOperationTypeImplicationsOfAllow( diff --git a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java index 1f7509738..162b4a00d 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/coordinator/TestCoordinatorGateway.java @@ -22,6 +22,8 @@ import org.apache.fluss.metadata.TableBucket; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.AdjustIsrResponse; +import org.apache.fluss.rpc.messages.AlterConfigsRequest; +import org.apache.fluss.rpc.messages.AlterConfigsResponse; import org.apache.fluss.rpc.messages.ApiVersionsRequest; import org.apache.fluss.rpc.messages.ApiVersionsResponse; import org.apache.fluss.rpc.messages.CommitKvSnapshotRequest; @@ -40,6 +42,8 @@ import org.apache.fluss.rpc.messages.CreateTableRequest; import org.apache.fluss.rpc.messages.CreateTableResponse; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeConfigsRequest; +import org.apache.fluss.rpc.messages.DescribeConfigsResponse; import org.apache.fluss.rpc.messages.DropAclsRequest; import org.apache.fluss.rpc.messages.DropAclsResponse; import org.apache.fluss.rpc.messages.DropDatabaseRequest; @@ -311,6 +315,17 @@ public class TestCoordinatorGateway implements CoordinatorGateway { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture<AlterConfigsResponse> alterConfigs(AlterConfigsRequest request) { + throw new UnsupportedOperationException(); + } + + @Override + public CompletableFuture<DescribeConfigsResponse> describeConfigs( + DescribeConfigsRequest request) { + throw new UnsupportedOperationException(); + } + public void setCurrentLeaderEpoch(TableBucket tableBucket, int leaderEpoch) { currentLeaderEpoch.put(tableBucket, leaderEpoch); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java index 631165923..1f64327c8 100644 --- a/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java +++ b/fluss-server/src/test/java/org/apache/fluss/server/tablet/TestTabletServerGateway.java @@ -26,6 +26,8 @@ import org.apache.fluss.rpc.messages.ApiVersionsRequest; import org.apache.fluss.rpc.messages.ApiVersionsResponse; import org.apache.fluss.rpc.messages.DatabaseExistsRequest; import org.apache.fluss.rpc.messages.DatabaseExistsResponse; +import org.apache.fluss.rpc.messages.DescribeConfigsRequest; +import org.apache.fluss.rpc.messages.DescribeConfigsResponse; import org.apache.fluss.rpc.messages.FetchLogRequest; import org.apache.fluss.rpc.messages.FetchLogResponse; import org.apache.fluss.rpc.messages.GetDatabaseInfoRequest; @@ -318,6 +320,12 @@ public class TestTabletServerGateway implements TabletServerGateway { throw new UnsupportedOperationException(); } + @Override + public CompletableFuture<DescribeConfigsResponse> describeConfigs( + DescribeConfigsRequest request) { + throw new UnsupportedOperationException(); + } + public int pendingRequestSize() { return requests.size(); } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ConfigJsonSerdeTest.java b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ConfigJsonSerdeTest.java new file mode 100644 index 000000000..a091b63b2 --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/zk/data/ConfigJsonSerdeTest.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.zk.data; + +import org.apache.fluss.utils.json.JsonSerdeTestBase; + +import java.util.HashMap; +import java.util.Map; + +/** Tests for {@link ConfigJsonSerde}. */ +public class ConfigJsonSerdeTest extends JsonSerdeTestBase<Map<String, String>> { + + public ConfigJsonSerdeTest() { + super(ConfigJsonSerde.INSTANCE); + } + + @Override + protected Map<String, String>[] createObjects() { + Map<String, String>[] maps = new Map[2]; + Map<String, String> map = new HashMap<>(); + map.put("datalake.format", "value1"); + map.put("key2", null); + map.put("key3", "value3"); + maps[0] = map; + maps[1] = new HashMap<>(); + return maps; + } + + @Override + protected String[] expectedJsons() { + return new String[] { + "{\"version\":1,\"config\":{\"datalake.format\":\"value1\",\"key2\":null,\"key3\":\"value3\"}}", + "{\"version\":1,\"config\":{}}" + }; + } +} diff --git a/fluss-test-coverage/pom.xml b/fluss-test-coverage/pom.xml index aaa08c07d..c96a948e1 100644 --- a/fluss-test-coverage/pom.xml +++ b/fluss-test-coverage/pom.xml @@ -324,6 +324,7 @@ org.apache.fluss.security.auth.sasl.plain.PlainSaslServer.PlainSaslServerFactory </exclude> <exclude>org.apache.fluss.security.auth.ServerAuthenticator</exclude> + <exclude>org.apache.fluss.config.dynamic.AlterConfigOp</exclude> <!-- start exclude for flink-connector --> <exclude>org.apache.fluss.flink.utils.*</exclude> <exclude>org.apache.fluss.flink.source.*
