This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new bafba4293 [server] Add configuration options for multiple remote data
directories (#2757)
bafba4293 is described below
commit bafba42932aa4ef0d3a7e026edb68f0383cfd7a4
Author: Liebing <[email protected]>
AuthorDate: Mon Mar 9 22:39:23 2026 +0800
[server] Add configuration options for multiple remote data directories
(#2757)
---
.../org/apache/fluss/config/ConfigOptions.java | 61 +++++++++-
.../org/apache/fluss/config/FlussConfigUtils.java | 117 +++++++++++++++++++
.../apache/fluss/config/FlussConfigUtilsTest.java | 126 +++++++++++++++++++++
.../server/coordinator/CoordinatorServer.java | 32 +-----
.../apache/fluss/server/tablet/TabletServer.java | 39 +------
website/docs/maintenance/configuration.md | 5 +-
6 files changed, 311 insertions(+), 69 deletions(-)
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 90f262b07..bcd00b130 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -96,8 +96,59 @@ public class ConfigOptions {
.stringType()
.noDefaultValue()
.withDescription(
- "The directory used for storing the kv snapshot
data files and remote log for log tiered storage "
- + " in a Fluss supported filesystem.");
+ "The directory used for storing the kv snapshot
data files and remote log for log tiered storage"
+ + " in a Fluss supported filesystem. "
+ + "When upgrading to `remote.data.dirs`,
please ensure this value is placed as the first entry in the new configuration."
+ + "For new clusters, it is recommended to
use `remote.data.dirs` instead. "
+ + "If `remote.data.dirs` is configured,
this value will be ignored.");
+
+ public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
+ key("remote.data.dirs")
+ .stringType()
+ .asList()
+ .defaultValues()
+ .withDescription(
+ "A comma-separated list of directories in Fluss
supported filesystems "
+ + "for storing the kv snapshot data files
and remote log files of tables/partitions. "
+ + "If configured, when a new table or a
new partition is created, "
+ + "one of the directories from this list
will be selected according to the strategy "
+ + "specified by
`remote.data.dirs.strategy` (`ROUND_ROBIN` by default). "
+ + "If not configured, the system uses `"
+ + REMOTE_DATA_DIR.key()
+ + "` as the sole remote data directory for
all data.");
+
+ public static final ConfigOption<RemoteDataDirStrategy>
REMOTE_DATA_DIRS_STRATEGY =
+ key("remote.data.dirs.strategy")
+ .enumType(RemoteDataDirStrategy.class)
+ .defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
+ .withDescription(
+ String.format(
+ "The strategy for selecting the remote
data directory from `%s`. "
+ + "The candidate strategies are:
%s, the default strategy is %s.\n"
+ + "%s: this strategy employs a
round-robin approach to select one from the available remote directories.\n"
+ + "%s: this strategy selects one
of the available remote directories based on the weights configured in
`remote.data.dirs.weights`.",
+ REMOTE_DATA_DIRS.key(),
+
Arrays.toString(RemoteDataDirStrategy.values()),
+ RemoteDataDirStrategy.ROUND_ROBIN,
+ RemoteDataDirStrategy.ROUND_ROBIN,
+
RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN));
+
+ public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
+ key("remote.data.dirs.weights")
+ .intType()
+ .asList()
+ .defaultValues()
+ .withDescription(
+ "The weights of the remote data directories. "
+ + "This is a list of weights corresponding
to the `"
+ + REMOTE_DATA_DIRS.key()
+ + "` in the same order. When `"
+ + REMOTE_DATA_DIRS_STRATEGY.key()
+ + "` is set to `"
+ +
RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
+ + "`, this must be configured, and its
size must be equal to `"
+ + REMOTE_DATA_DIRS.key()
+ + "`; otherwise, it will be ignored.");
public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
key("remote.fs.write-buffer-size")
@@ -2076,4 +2127,10 @@ public class ConfigOptions {
public static ConfigOption<?> getConfigOption(String key) {
return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
}
+
+ /** Remote data dir select strategy for Fluss. */
+ public enum RemoteDataDirStrategy {
+ ROUND_ROBIN,
+ WEIGHTED_ROUND_ROBIN
+ }
}
diff --git
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index 08b97256c..b3a1d84f9 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -19,12 +19,15 @@ package org.apache.fluss.config;
import org.apache.fluss.annotation.Internal;
import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.IllegalConfigurationException;
+import org.apache.fluss.fs.FsPath;
import java.lang.reflect.Field;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/** Utilities of Fluss {@link ConfigOptions}. */
@Internal
@@ -77,4 +80,118 @@ public class FlussConfigUtils {
}
return options;
}
+
+ public static void validateCoordinatorConfigs(Configuration conf) {
+ validateServerConfigs(conf);
+ }
+
+ public static void validateTabletConfigs(Configuration conf) {
+ validateServerConfigs(conf);
+
+ Optional<Integer> serverId =
conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
+ if (!serverId.isPresent()) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Configuration %s must be set.",
ConfigOptions.TABLET_SERVER_ID.key()));
+ }
+ validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
+ }
+
+ /** Validate common server configs. */
+ protected static void validateServerConfigs(Configuration conf) {
+ // Validate remote.data.dir and remote.data.dirs
+ String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR);
+ List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
+ if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null
+ && conf.get(ConfigOptions.REMOTE_DATA_DIRS).isEmpty()) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Either %s or %s must be configured.",
+ ConfigOptions.REMOTE_DATA_DIR.key(),
+ ConfigOptions.REMOTE_DATA_DIRS.key()));
+ }
+
+ if (remoteDataDir != null) {
+ // Must validate that remote.data.dir is a valid FsPath
+ try {
+ new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid configuration for %s.",
+ ConfigOptions.REMOTE_DATA_DIR.key()),
+ e);
+ }
+ }
+
+ // Validate remote.data.dirs
+ for (int i = 0; i < remoteDataDirs.size(); i++) {
+ String dir = remoteDataDirs.get(i);
+ try {
+ new FsPath(dir);
+ } catch (Exception e) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid remote path for %s at index %d.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(), i),
+ e);
+ }
+ }
+
+ // Validate remote.data.dirs.strategy
+ ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
+ conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
+ if (remoteDataDirStrategy ==
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
+ List<Integer> weights =
conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
+ if (!remoteDataDirs.isEmpty()) {
+ if (remoteDataDirs.size() != weights.size()) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "The size of '%s' (%d) must match the size
of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
+ ConfigOptions.REMOTE_DATA_DIRS.key(),
+ remoteDataDirs.size(),
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.size()));
+ }
+
+ // Validate all weights are no less than 0
+ for (int i = 0; i < weights.size(); i++) {
+ if (weights.get(i) < 0) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "All weights in '%s' must be no less
than 0, but found %d at index %d.",
+
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+ weights.get(i),
+ i));
+ }
+ }
+ }
+ }
+
+ validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
+ validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
+ validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
+ validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);
+
+ if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() >
Integer.MAX_VALUE) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid configuration for %s, it must be less
than or equal %d bytes.",
+ ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(),
Integer.MAX_VALUE));
+ }
+ }
+
+ private static void validMinValue(
+ Configuration conf, ConfigOption<Integer> option, int minValue) {
+ validMinValue(option, conf.get(option), minValue);
+ }
+
+ private static void validMinValue(ConfigOption<Integer> option, int value,
int minValue) {
+ if (value < minValue) {
+ throw new IllegalConfigurationException(
+ String.format(
+ "Invalid configuration for %s, it must be greater
than or equal %d.",
+ option.key(), minValue));
+ }
+ }
}
diff --git
a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java
b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java
index e24bc121a..e2690c54b 100644
---
a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java
+++
b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java
@@ -17,14 +17,21 @@
package org.apache.fluss.config;
+import org.apache.fluss.exception.IllegalConfigurationException;
+
import org.junit.jupiter.api.Test;
+import java.util.Arrays;
+import java.util.Collections;
import java.util.Map;
import static org.apache.fluss.config.FlussConfigUtils.CLIENT_OPTIONS;
import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions;
+import static
org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs;
+import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Test for {@link FlussConfigUtils}. */
class FlussConfigUtilsTest {
@@ -49,4 +56,123 @@ class FlussConfigUtilsTest {
});
assertThat(clientOptions.size()).isEqualTo(CLIENT_OPTIONS.size());
}
+
+ @Test
+ void testValidateCoordinatorConfigs() {
+ // Test empty configuration
+ Configuration emptyConf = new Configuration();
+ assertThatThrownBy(() -> validateCoordinatorConfigs(emptyConf))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
+ .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
+ .hasMessageContaining("must be configured");
+
+ // Test configuration with only REMOTE_DATA_DIR set
+ Configuration remoteDataDirConf = new Configuration();
+ remoteDataDirConf.set(ConfigOptions.REMOTE_DATA_DIR,
"s3://bucket/path");
+ validateCoordinatorConfigs(remoteDataDirConf);
+
+ // Test invalid REMOTE_DATA_DIR
+ Configuration invalidRemoteDirConf = new Configuration();
+ invalidRemoteDirConf.set(ConfigOptions.REMOTE_DATA_DIR,
"123://invalid.com");
+ assertThatThrownBy(() ->
validateCoordinatorConfigs(invalidRemoteDirConf))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
+ .hasMessageContaining("Invalid configuration for
remote.data.dir");
+
+ // Test configuration with only REMOTE_DATA_DIRS set
+ Configuration remoteDataDirsConf = new Configuration();
+ remoteDataDirsConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1",
"s3://bucket2"));
+ validateCoordinatorConfigs(remoteDataDirConf);
+
+ // Test REMOTE_DATA_DIRS contains invalid path
+ Configuration invalidRemoteDirsConf = new Configuration();
+ invalidRemoteDirsConf.set(ConfigOptions.REMOTE_DATA_DIR,
"s3://bucket/path");
+ invalidRemoteDirsConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1",
"123://invalid.com"));
+ assertThatThrownBy(() ->
validateCoordinatorConfigs(invalidRemoteDirsConf))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
+ .hasMessageContaining("Invalid remote path for");
+
+ // Test WEIGHTED_ROUND_ROBIN with mismatched sizes
+ Configuration mismatchedWeightsConf = new Configuration();
+ mismatchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR,
"s3://bucket/path");
+ mismatchedWeightsConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
+ ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
+ mismatchedWeightsConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1",
"s3://bucket2"));
+ mismatchedWeightsConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS,
Collections.singletonList(1));
+ assertThatThrownBy(() ->
validateCoordinatorConfigs(mismatchedWeightsConf))
+ .isInstanceOf(IllegalConfigurationException.class)
+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
+ .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key());
+
+ // Test WEIGHTED_ROUND_ROBIN with matched sizes
+ Configuration matchedWeightsConf = new Configuration();
+ matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR,
"s3://bucket/path");
+ matchedWeightsConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
+ ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
+ matchedWeightsConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1",
"s3://bucket2"));
+ matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS,
Arrays.asList(0, 2));
+ validateCoordinatorConfigs(matchedWeightsConf);
+
+ // Test negative weight
+ Configuration negativeWeightConf = new Configuration();
+ negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIR,
"s3://bucket/path");
+ negativeWeightConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
+ ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
+ negativeWeightConf.set(
+ ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1",
"s3://bucket2"));
+ negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS,
Arrays.asList(-1, 2));
+ assertThatThrownBy(() ->
validateCoordinatorConfigs(negativeWeightConf))
+ .isInstanceOf(IllegalConfigurationException.class)
+
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
+ .hasMessageContaining(
+ "All weights in 'remote.data.dirs.weights' must be no
less than 0");
+
+ // Test invalid DEFAULT_REPLICATION_FACTOR
+ Configuration invalidReplicationConf = new Configuration();
+ invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR,
"s3://bucket/path");
+ invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR,
0);
+ assertThatThrownBy(() ->
validateCoordinatorConfigs(invalidReplicationConf))
+ .isInstanceOf(IllegalConfigurationException.class)
+
.hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key())
+ .hasMessageContaining("must be greater than or equal 1");
+
+ // Test invalid KV_MAX_RETAINED_SNAPSHOTS
+ Configuration invalidSnapshotConf = new Configuration();
+ invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR,
"s3://bucket/path");
+ invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0);
+ assertThatThrownBy(() ->
validateCoordinatorConfigs(invalidSnapshotConf))
+ .isInstanceOf(IllegalConfigurationException.class)
+
.hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key())
+ .hasMessageContaining("must be greater than or equal 1");
+
+ // Test invalid SERVER_IO_POOL_SIZE
+ Configuration invalidIoPoolConf = new Configuration();
+ invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR,
"s3://bucket/path");
+ invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0);
+ assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key())
+ .hasMessageContaining("must be greater than or equal 1");
+ }
+
+ @Test
+ void testValidateTabletConfigs() {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
+ conf.set(ConfigOptions.TABLET_SERVER_ID, -1);
+ assertThatThrownBy(() -> validateTabletConfigs(conf))
+ .isInstanceOf(IllegalConfigurationException.class)
+ .hasMessageContaining(ConfigOptions.TABLET_SERVER_ID.key())
+ .hasMessageContaining("it must be greater than or equal 0");
+ }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 774c23cce..1f9f3306e 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -22,7 +22,6 @@ import org.apache.fluss.cluster.Endpoint;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.metadata.DatabaseDescriptor;
import org.apache.fluss.metrics.registry.MetricRegistry;
import org.apache.fluss.rpc.RpcClient;
@@ -67,6 +66,8 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import static
org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs;
+
/**
* Coordinator server implementation. The coordinator server is responsible to:
*
@@ -155,7 +156,7 @@ public class CoordinatorServer extends ServerBase {
public CoordinatorServer(Configuration conf, Clock clock) {
super(conf);
- validateConfigs(conf);
+ validateCoordinatorConfigs(conf);
this.terminationFuture = new CompletableFuture<>();
this.clock = clock;
}
@@ -552,31 +553,4 @@ public class CoordinatorServer extends ServerBase {
public RebalanceManager getRebalanceManager() {
return coordinatorEventProcessor.getRebalanceManager();
}
-
- private static void validateConfigs(Configuration conf) {
- if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
- throw new IllegalConfigurationException(
- String.format(
- "Invalid configuration for %s, it must be greater
than or equal 1.",
- ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()));
- }
- if (conf.get(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS) < 1) {
- throw new IllegalConfigurationException(
- String.format(
- "Invalid configuration for %s, it must be greater
than or equal 1.",
- ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()));
- }
-
- if (conf.get(ConfigOptions.SERVER_IO_POOL_SIZE) < 1) {
- throw new IllegalConfigurationException(
- String.format(
- "Invalid configuration for %s, it must be greater
than or equal 1.",
- ConfigOptions.SERVER_IO_POOL_SIZE.key()));
- }
-
- if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
- throw new IllegalConfigurationException(
- String.format("Configuration %s must be set.",
ConfigOptions.REMOTE_DATA_DIR));
- }
- }
}
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 8eed63c84..d108a6337 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -22,7 +22,6 @@ import org.apache.fluss.cluster.Endpoint;
import org.apache.fluss.cluster.ServerType;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.IllegalConfigurationException;
import org.apache.fluss.exception.InvalidServerRackInfoException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.metrics.registry.MetricRegistry;
@@ -71,7 +70,6 @@ import javax.annotation.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -79,6 +77,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.fluss.config.ConfigOptions.BACKGROUND_THREADS;
+import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs;
import static
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucket;
/**
@@ -177,7 +176,7 @@ public class TabletServer extends ServerBase {
public TabletServer(Configuration conf, Clock clock) {
super(conf);
- validateConfigs(conf);
+ validateTabletConfigs(conf);
this.terminationFuture = new CompletableFuture<>();
this.serverId = conf.getInt(ConfigOptions.TABLET_SERVER_ID);
this.rack = conf.getString(ConfigOptions.TABLET_SERVER_RACK);
@@ -558,40 +557,6 @@ public class TabletServer extends ServerBase {
return authorizer;
}
- private static void validateConfigs(Configuration conf) {
- Optional<Integer> serverId =
conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
- if (!serverId.isPresent()) {
- throw new IllegalConfigurationException(
- String.format("Configuration %s must be set.",
ConfigOptions.TABLET_SERVER_ID));
- }
-
- if (serverId.get() < 0) {
- throw new IllegalConfigurationException(
- String.format(
- "Invalid configuration for %s, it must be greater
than or equal 0.",
- ConfigOptions.TABLET_SERVER_ID.key()));
- }
-
- if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
- throw new IllegalConfigurationException(
- String.format(
- "Invalid configuration for %s, it must be greater
than or equal 1.",
- ConfigOptions.BACKGROUND_THREADS.key()));
- }
-
- if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
- throw new IllegalConfigurationException(
- String.format("Configuration %s must be set.",
ConfigOptions.REMOTE_DATA_DIR));
- }
-
- if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() >
Integer.MAX_VALUE) {
- throw new IllegalConfigurationException(
- String.format(
- "Invalid configuration for %s, it must be less
than or equal %d bytes.",
- ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(),
Integer.MAX_VALUE));
- }
- }
-
@VisibleForTesting
public RpcServer getRpcServer() {
return rpcServer;
diff --git a/website/docs/maintenance/configuration.md
b/website/docs/maintenance/configuration.md
index 6304e439a..f73b4e543 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -35,7 +35,10 @@ during the Fluss cluster working.
| `security.${protocol}.*` | String |
(none)
| Protocol-specific configuration properties. For example,
security.sasl.jaas.config for SASL authentication settings.
[...]
| default.bucket.number | Integer | 1
| The default number of buckets for a table in Fluss cluster. It's a
cluster-level parameter and all the tables without specifying bucket number in
the cluster will use the value as the bucket number.
[...]
| default.replication.factor | Integer | 1
| The default replication factor for the log of a table in Fluss
cluster. It's a cluster-level parameter, and all the tables without specifying
replication factor in the cluster will use the value as replication factor.
[...]
-| remote.data.dir | String |
(None)
| The directory used for storing the kv snapshot data files and remote
log for log tiered storage in a Fluss supported filesystem.
[...]
+| remote.data.dir | String |
(None)
| The directory used for storing the kv snapshot data files and remote
log for log tiered storage in a Fluss supported filesystem. When upgrading to
`remote.data.dirs`, please ensure this value is placed as the first entry in
the new configuratio [...]
+| remote.data.dirs | List<String> |
(None)
| A comma-separated list of directories in Fluss supported filesystems
for storing the kv snapshot data files and remote log files of
tables/partitions. If configured, when a new table or a new partition is
created, one of the directories from th [...]
+| remote.data.dirs.strategy | Enum |
ROUND_ROBIN
| The strategy for selecting the remote data directory from
`remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN,
WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN.<br/>ROUND_ROBIN:
this strategy employs a round-robin approach [...]
+| remote.data.dirs.weights | List<Integer>|
(None)
| The weights of the remote data directories. This is a list of
weights corresponding to the `remote.data.dirs` in the same order. When
`remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be
configured, and its size must be eq [...]
| remote.fs.write-buffer-size | MemorySize |
4kb
| The default size of the write buffer for writing the local files to
remote file systems.
[...]
| plugin.classloader.parent-first-patterns.additional | List<String> |
(None)
| A (semicolon-separated) list of patterns that specifies which
classes should always be resolved through the plugin parent ClassLoader first.
A pattern is a simple prefix that is checked against the fully qualified class
name. These patterns are [...]
| plugin.classloader.parent-first-patterns.default | String |
java.,<br/>org.apache.fluss.,<br/>javax.annotation.,<br/>org.slf4j,<br/>org.apache.log4j,<br/>org.apache.logging,<br/>org.apache.commons.logging,<br/>ch.qos.logback
| A (semicolon-separated) list of patterns that specifies which classes
should always be resolved through the plugin parent ClassLoader first. A
pattern is a simple prefix that is checked against the fully qualified class
name. This setting shoul [...]