This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new bafba4293 [server] Add configuration options for multiple remote data 
directories (#2757)
bafba4293 is described below

commit bafba42932aa4ef0d3a7e026edb68f0383cfd7a4
Author: Liebing <[email protected]>
AuthorDate: Mon Mar 9 22:39:23 2026 +0800

    [server] Add configuration options for multiple remote data directories 
(#2757)
---
 .../org/apache/fluss/config/ConfigOptions.java     |  61 +++++++++-
 .../org/apache/fluss/config/FlussConfigUtils.java  | 117 +++++++++++++++++++
 .../apache/fluss/config/FlussConfigUtilsTest.java  | 126 +++++++++++++++++++++
 .../server/coordinator/CoordinatorServer.java      |  32 +-----
 .../apache/fluss/server/tablet/TabletServer.java   |  39 +------
 website/docs/maintenance/configuration.md          |   5 +-
 6 files changed, 311 insertions(+), 69 deletions(-)

diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java 
b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
index 90f262b07..bcd00b130 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java
@@ -96,8 +96,59 @@ public class ConfigOptions {
                     .stringType()
                     .noDefaultValue()
                     .withDescription(
-                            "The directory used for storing the kv snapshot 
data files and remote log for log tiered storage "
-                                    + " in a Fluss supported filesystem.");
+                            "The directory used for storing the kv snapshot 
data files and remote log for log tiered storage"
+                                    + " in a Fluss supported filesystem. "
+                                    + "When upgrading to `remote.data.dirs`, 
please ensure this value is placed as the first entry in the new configuration."
+                                    + "For new clusters, it is recommended to 
use `remote.data.dirs` instead. "
+                                    + "If `remote.data.dirs` is configured, 
this value will be ignored.");
+
+    public static final ConfigOption<List<String>> REMOTE_DATA_DIRS =
+            key("remote.data.dirs")
+                    .stringType()
+                    .asList()
+                    .defaultValues()
+                    .withDescription(
+                            "A comma-separated list of directories in Fluss 
supported filesystems "
+                                    + "for storing the kv snapshot data files 
and remote log files of tables/partitions. "
+                                    + "If configured, when a new table or a 
new partition is created, "
+                                    + "one of the directories from this list 
will be selected according to the strategy "
+                                    + "specified by 
`remote.data.dirs.strategy` (`ROUND_ROBIN` by default). "
+                                    + "If not configured, the system uses `"
+                                    + REMOTE_DATA_DIR.key()
+                                    + "` as the sole remote data directory for 
all data.");
+
+    public static final ConfigOption<RemoteDataDirStrategy> 
REMOTE_DATA_DIRS_STRATEGY =
+            key("remote.data.dirs.strategy")
+                    .enumType(RemoteDataDirStrategy.class)
+                    .defaultValue(RemoteDataDirStrategy.ROUND_ROBIN)
+                    .withDescription(
+                            String.format(
+                                    "The strategy for selecting the remote 
data directory from `%s`. "
+                                            + "The candidate strategies are: 
%s, the default strategy is %s.\n"
+                                            + "%s: this strategy employs a 
round-robin approach to select one from the available remote directories.\n"
+                                            + "%s: this strategy selects one 
of the available remote directories based on the weights configured in 
`remote.data.dirs.weights`.",
+                                    REMOTE_DATA_DIRS.key(),
+                                    
Arrays.toString(RemoteDataDirStrategy.values()),
+                                    RemoteDataDirStrategy.ROUND_ROBIN,
+                                    RemoteDataDirStrategy.ROUND_ROBIN,
+                                    
RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN));
+
+    public static final ConfigOption<List<Integer>> REMOTE_DATA_DIRS_WEIGHTS =
+            key("remote.data.dirs.weights")
+                    .intType()
+                    .asList()
+                    .defaultValues()
+                    .withDescription(
+                            "The weights of the remote data directories. "
+                                    + "This is a list of weights corresponding 
to the `"
+                                    + REMOTE_DATA_DIRS.key()
+                                    + "` in the same order. When `"
+                                    + REMOTE_DATA_DIRS_STRATEGY.key()
+                                    + "` is set to `"
+                                    + 
RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN
+                                    + "`, this must be configured, and its 
size must be equal to `"
+                                    + REMOTE_DATA_DIRS.key()
+                                    + "`; otherwise, it will be ignored.");
 
     public static final ConfigOption<MemorySize> REMOTE_FS_WRITE_BUFFER_SIZE =
             key("remote.fs.write-buffer-size")
@@ -2076,4 +2127,10 @@ public class ConfigOptions {
     public static ConfigOption<?> getConfigOption(String key) {
         return ConfigOptionsHolder.CONFIG_OPTIONS_BY_KEY.get(key);
     }
+
+    /** Remote data dir select strategy for Fluss. */
+    public enum RemoteDataDirStrategy {
+        ROUND_ROBIN,
+        WEIGHTED_ROUND_ROBIN
+    }
 }
diff --git 
a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java 
b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
index 08b97256c..b3a1d84f9 100644
--- a/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
+++ b/fluss-common/src/main/java/org/apache/fluss/config/FlussConfigUtils.java
@@ -19,12 +19,15 @@ package org.apache.fluss.config;
 
 import org.apache.fluss.annotation.Internal;
 import org.apache.fluss.annotation.VisibleForTesting;
+import org.apache.fluss.exception.IllegalConfigurationException;
+import org.apache.fluss.fs.FsPath;
 
 import java.lang.reflect.Field;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 /** Utilities of Fluss {@link ConfigOptions}. */
 @Internal
@@ -77,4 +80,118 @@ public class FlussConfigUtils {
         }
         return options;
     }
+
+    public static void validateCoordinatorConfigs(Configuration conf) {
+        validateServerConfigs(conf);
+    }
+
+    public static void validateTabletConfigs(Configuration conf) {
+        validateServerConfigs(conf);
+
+        Optional<Integer> serverId = 
conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
+        if (!serverId.isPresent()) {
+            throw new IllegalConfigurationException(
+                    String.format(
+                            "Configuration %s must be set.", 
ConfigOptions.TABLET_SERVER_ID.key()));
+        }
+        validMinValue(ConfigOptions.TABLET_SERVER_ID, serverId.get(), 0);
+    }
+
+    /** Validate common server configs. */
+    protected static void validateServerConfigs(Configuration conf) {
+        // Validate remote.data.dir and remote.data.dirs
+        String remoteDataDir = conf.get(ConfigOptions.REMOTE_DATA_DIR);
+        List<String> remoteDataDirs = conf.get(ConfigOptions.REMOTE_DATA_DIRS);
+        if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null
+                && conf.get(ConfigOptions.REMOTE_DATA_DIRS).isEmpty()) {
+            throw new IllegalConfigurationException(
+                    String.format(
+                            "Either %s or %s must be configured.",
+                            ConfigOptions.REMOTE_DATA_DIR.key(),
+                            ConfigOptions.REMOTE_DATA_DIRS.key()));
+        }
+
+        if (remoteDataDir != null) {
+            // Must validate that remote.data.dir is a valid FsPath
+            try {
+                new FsPath(conf.get(ConfigOptions.REMOTE_DATA_DIR));
+            } catch (Exception e) {
+                throw new IllegalConfigurationException(
+                        String.format(
+                                "Invalid configuration for %s.",
+                                ConfigOptions.REMOTE_DATA_DIR.key()),
+                        e);
+            }
+        }
+
+        // Validate remote.data.dirs
+        for (int i = 0; i < remoteDataDirs.size(); i++) {
+            String dir = remoteDataDirs.get(i);
+            try {
+                new FsPath(dir);
+            } catch (Exception e) {
+                throw new IllegalConfigurationException(
+                        String.format(
+                                "Invalid remote path for %s at index %d.",
+                                ConfigOptions.REMOTE_DATA_DIRS.key(), i),
+                        e);
+            }
+        }
+
+        // Validate remote.data.dirs.strategy
+        ConfigOptions.RemoteDataDirStrategy remoteDataDirStrategy =
+                conf.get(ConfigOptions.REMOTE_DATA_DIRS_STRATEGY);
+        if (remoteDataDirStrategy == 
ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN) {
+            List<Integer> weights = 
conf.get(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS);
+            if (!remoteDataDirs.isEmpty()) {
+                if (remoteDataDirs.size() != weights.size()) {
+                    throw new IllegalConfigurationException(
+                            String.format(
+                                    "The size of '%s' (%d) must match the size 
of '%s' (%d) when using WEIGHTED_ROUND_ROBIN strategy.",
+                                    ConfigOptions.REMOTE_DATA_DIRS.key(),
+                                    remoteDataDirs.size(),
+                                    
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+                                    weights.size()));
+                }
+
+                // Validate all weights are no less than 0
+                for (int i = 0; i < weights.size(); i++) {
+                    if (weights.get(i) < 0) {
+                        throw new IllegalConfigurationException(
+                                String.format(
+                                        "All weights in '%s' must be no less 
than 0, but found %d at index %d.",
+                                        
ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key(),
+                                        weights.get(i),
+                                        i));
+                    }
+                }
+            }
+        }
+
+        validMinValue(conf, ConfigOptions.DEFAULT_REPLICATION_FACTOR, 1);
+        validMinValue(conf, ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 1);
+        validMinValue(conf, ConfigOptions.SERVER_IO_POOL_SIZE, 1);
+        validMinValue(conf, ConfigOptions.BACKGROUND_THREADS, 1);
+
+        if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > 
Integer.MAX_VALUE) {
+            throw new IllegalConfigurationException(
+                    String.format(
+                            "Invalid configuration for %s, it must be less 
than or equal %d bytes.",
+                            ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), 
Integer.MAX_VALUE));
+        }
+    }
+
+    private static void validMinValue(
+            Configuration conf, ConfigOption<Integer> option, int minValue) {
+        validMinValue(option, conf.get(option), minValue);
+    }
+
+    private static void validMinValue(ConfigOption<Integer> option, int value, 
int minValue) {
+        if (value < minValue) {
+            throw new IllegalConfigurationException(
+                    String.format(
+                            "Invalid configuration for %s, it must be greater 
than or equal %d.",
+                            option.key(), minValue));
+        }
+    }
 }
diff --git 
a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java 
b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java
index e24bc121a..e2690c54b 100644
--- 
a/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java
+++ 
b/fluss-common/src/test/java/org/apache/fluss/config/FlussConfigUtilsTest.java
@@ -17,14 +17,21 @@
 
 package org.apache.fluss.config;
 
+import org.apache.fluss.exception.IllegalConfigurationException;
+
 import org.junit.jupiter.api.Test;
 
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.Map;
 
 import static org.apache.fluss.config.FlussConfigUtils.CLIENT_OPTIONS;
 import static org.apache.fluss.config.FlussConfigUtils.TABLE_OPTIONS;
 import static org.apache.fluss.config.FlussConfigUtils.extractConfigOptions;
+import static 
org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs;
+import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs;
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Test for {@link FlussConfigUtils}. */
 class FlussConfigUtilsTest {
@@ -49,4 +56,123 @@ class FlussConfigUtilsTest {
                 });
         assertThat(clientOptions.size()).isEqualTo(CLIENT_OPTIONS.size());
     }
+
+    @Test
+    void testValidateCoordinatorConfigs() {
+        // Test empty configuration
+        Configuration emptyConf = new Configuration();
+        assertThatThrownBy(() -> validateCoordinatorConfigs(emptyConf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
+                .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
+                .hasMessageContaining("must be configured");
+
+        // Test configuration with only REMOTE_DATA_DIR set
+        Configuration remoteDataDirConf = new Configuration();
+        remoteDataDirConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"s3://bucket/path");
+        validateCoordinatorConfigs(remoteDataDirConf);
+
+        // Test invalid REMOTE_DATA_DIR
+        Configuration invalidRemoteDirConf = new Configuration();
+        invalidRemoteDirConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"123://invalid.com");
+        assertThatThrownBy(() -> 
validateCoordinatorConfigs(invalidRemoteDirConf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIR.key())
+                .hasMessageContaining("Invalid configuration for 
remote.data.dir");
+
+        // Test configuration with only REMOTE_DATA_DIRS set
+        Configuration remoteDataDirsConf = new Configuration();
+        remoteDataDirsConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", 
"s3://bucket2"));
+        validateCoordinatorConfigs(remoteDataDirConf);
+
+        // Test REMOTE_DATA_DIRS contains invalid path
+        Configuration invalidRemoteDirsConf = new Configuration();
+        invalidRemoteDirsConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"s3://bucket/path");
+        invalidRemoteDirsConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", 
"123://invalid.com"));
+        assertThatThrownBy(() -> 
validateCoordinatorConfigs(invalidRemoteDirsConf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key())
+                .hasMessageContaining("Invalid remote path for");
+
+        // Test WEIGHTED_ROUND_ROBIN with mismatched sizes
+        Configuration mismatchedWeightsConf = new Configuration();
+        mismatchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"s3://bucket/path");
+        mismatchedWeightsConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
+                ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
+        mismatchedWeightsConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", 
"s3://bucket2"));
+        mismatchedWeightsConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, 
Collections.singletonList(1));
+        assertThatThrownBy(() -> 
validateCoordinatorConfigs(mismatchedWeightsConf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
+                .hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS.key());
+
+        // Test WEIGHTED_ROUND_ROBIN with matched sizes
+        Configuration matchedWeightsConf = new Configuration();
+        matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"s3://bucket/path");
+        matchedWeightsConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
+                ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
+        matchedWeightsConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", 
"s3://bucket2"));
+        matchedWeightsConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, 
Arrays.asList(0, 2));
+        validateCoordinatorConfigs(matchedWeightsConf);
+
+        // Test negative weight
+        Configuration negativeWeightConf = new Configuration();
+        negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"s3://bucket/path");
+        negativeWeightConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS_STRATEGY,
+                ConfigOptions.RemoteDataDirStrategy.WEIGHTED_ROUND_ROBIN);
+        negativeWeightConf.set(
+                ConfigOptions.REMOTE_DATA_DIRS, Arrays.asList("s3://bucket1", 
"s3://bucket2"));
+        negativeWeightConf.set(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS, 
Arrays.asList(-1, 2));
+        assertThatThrownBy(() -> 
validateCoordinatorConfigs(negativeWeightConf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                
.hasMessageContaining(ConfigOptions.REMOTE_DATA_DIRS_WEIGHTS.key())
+                .hasMessageContaining(
+                        "All weights in 'remote.data.dirs.weights' must be no 
less than 0");
+
+        // Test invalid DEFAULT_REPLICATION_FACTOR
+        Configuration invalidReplicationConf = new Configuration();
+        invalidReplicationConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"s3://bucket/path");
+        invalidReplicationConf.set(ConfigOptions.DEFAULT_REPLICATION_FACTOR, 
0);
+        assertThatThrownBy(() -> 
validateCoordinatorConfigs(invalidReplicationConf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                
.hasMessageContaining(ConfigOptions.DEFAULT_REPLICATION_FACTOR.key())
+                .hasMessageContaining("must be greater than or equal 1");
+
+        // Test invalid KV_MAX_RETAINED_SNAPSHOTS
+        Configuration invalidSnapshotConf = new Configuration();
+        invalidSnapshotConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"s3://bucket/path");
+        invalidSnapshotConf.set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, 0);
+        assertThatThrownBy(() -> 
validateCoordinatorConfigs(invalidSnapshotConf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                
.hasMessageContaining(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key())
+                .hasMessageContaining("must be greater than or equal 1");
+
+        // Test invalid SERVER_IO_POOL_SIZE
+        Configuration invalidIoPoolConf = new Configuration();
+        invalidIoPoolConf.set(ConfigOptions.REMOTE_DATA_DIR, 
"s3://bucket/path");
+        invalidIoPoolConf.set(ConfigOptions.SERVER_IO_POOL_SIZE, 0);
+        assertThatThrownBy(() -> validateCoordinatorConfigs(invalidIoPoolConf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining(ConfigOptions.SERVER_IO_POOL_SIZE.key())
+                .hasMessageContaining("must be greater than or equal 1");
+    }
+
+    @Test
+    void testValidateTabletConfigs() {
+        Configuration conf = new Configuration();
+        conf.set(ConfigOptions.REMOTE_DATA_DIR, "s3://bucket/path");
+        conf.set(ConfigOptions.TABLET_SERVER_ID, -1);
+        assertThatThrownBy(() -> validateTabletConfigs(conf))
+                .isInstanceOf(IllegalConfigurationException.class)
+                .hasMessageContaining(ConfigOptions.TABLET_SERVER_ID.key())
+                .hasMessageContaining("it must be greater than or equal 0");
+    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
index 774c23cce..1f9f3306e 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorServer.java
@@ -22,7 +22,6 @@ import org.apache.fluss.cluster.Endpoint;
 import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.IllegalConfigurationException;
 import org.apache.fluss.metadata.DatabaseDescriptor;
 import org.apache.fluss.metrics.registry.MetricRegistry;
 import org.apache.fluss.rpc.RpcClient;
@@ -67,6 +66,8 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import static 
org.apache.fluss.config.FlussConfigUtils.validateCoordinatorConfigs;
+
 /**
  * Coordinator server implementation. The coordinator server is responsible to:
  *
@@ -155,7 +156,7 @@ public class CoordinatorServer extends ServerBase {
 
     public CoordinatorServer(Configuration conf, Clock clock) {
         super(conf);
-        validateConfigs(conf);
+        validateCoordinatorConfigs(conf);
         this.terminationFuture = new CompletableFuture<>();
         this.clock = clock;
     }
@@ -552,31 +553,4 @@ public class CoordinatorServer extends ServerBase {
     public RebalanceManager getRebalanceManager() {
         return coordinatorEventProcessor.getRebalanceManager();
     }
-
-    private static void validateConfigs(Configuration conf) {
-        if (conf.get(ConfigOptions.DEFAULT_REPLICATION_FACTOR) < 1) {
-            throw new IllegalConfigurationException(
-                    String.format(
-                            "Invalid configuration for %s, it must be greater 
than or equal 1.",
-                            ConfigOptions.DEFAULT_REPLICATION_FACTOR.key()));
-        }
-        if (conf.get(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS) < 1) {
-            throw new IllegalConfigurationException(
-                    String.format(
-                            "Invalid configuration for %s, it must be greater 
than or equal 1.",
-                            ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS.key()));
-        }
-
-        if (conf.get(ConfigOptions.SERVER_IO_POOL_SIZE) < 1) {
-            throw new IllegalConfigurationException(
-                    String.format(
-                            "Invalid configuration for %s, it must be greater 
than or equal 1.",
-                            ConfigOptions.SERVER_IO_POOL_SIZE.key()));
-        }
-
-        if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
-            throw new IllegalConfigurationException(
-                    String.format("Configuration %s must be set.", 
ConfigOptions.REMOTE_DATA_DIR));
-        }
-    }
 }
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 8eed63c84..d108a6337 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -22,7 +22,6 @@ import org.apache.fluss.cluster.Endpoint;
 import org.apache.fluss.cluster.ServerType;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
-import org.apache.fluss.exception.IllegalConfigurationException;
 import org.apache.fluss.exception.InvalidServerRackInfoException;
 import org.apache.fluss.metadata.TableBucket;
 import org.apache.fluss.metrics.registry.MetricRegistry;
@@ -71,7 +70,6 @@ import javax.annotation.concurrent.GuardedBy;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -79,6 +77,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.fluss.config.ConfigOptions.BACKGROUND_THREADS;
+import static org.apache.fluss.config.FlussConfigUtils.validateTabletConfigs;
 import static 
org.apache.fluss.server.utils.ServerRpcMessageUtils.toTableBucket;
 
 /**
@@ -177,7 +176,7 @@ public class TabletServer extends ServerBase {
 
     public TabletServer(Configuration conf, Clock clock) {
         super(conf);
-        validateConfigs(conf);
+        validateTabletConfigs(conf);
         this.terminationFuture = new CompletableFuture<>();
         this.serverId = conf.getInt(ConfigOptions.TABLET_SERVER_ID);
         this.rack = conf.getString(ConfigOptions.TABLET_SERVER_RACK);
@@ -558,40 +557,6 @@ public class TabletServer extends ServerBase {
         return authorizer;
     }
 
-    private static void validateConfigs(Configuration conf) {
-        Optional<Integer> serverId = 
conf.getOptional(ConfigOptions.TABLET_SERVER_ID);
-        if (!serverId.isPresent()) {
-            throw new IllegalConfigurationException(
-                    String.format("Configuration %s must be set.", 
ConfigOptions.TABLET_SERVER_ID));
-        }
-
-        if (serverId.get() < 0) {
-            throw new IllegalConfigurationException(
-                    String.format(
-                            "Invalid configuration for %s, it must be greater 
than or equal 0.",
-                            ConfigOptions.TABLET_SERVER_ID.key()));
-        }
-
-        if (conf.get(ConfigOptions.BACKGROUND_THREADS) < 1) {
-            throw new IllegalConfigurationException(
-                    String.format(
-                            "Invalid configuration for %s, it must be greater 
than or equal 1.",
-                            ConfigOptions.BACKGROUND_THREADS.key()));
-        }
-
-        if (conf.get(ConfigOptions.REMOTE_DATA_DIR) == null) {
-            throw new IllegalConfigurationException(
-                    String.format("Configuration %s must be set.", 
ConfigOptions.REMOTE_DATA_DIR));
-        }
-
-        if (conf.get(ConfigOptions.LOG_SEGMENT_FILE_SIZE).getBytes() > 
Integer.MAX_VALUE) {
-            throw new IllegalConfigurationException(
-                    String.format(
-                            "Invalid configuration for %s, it must be less 
than or equal %d bytes.",
-                            ConfigOptions.LOG_SEGMENT_FILE_SIZE.key(), 
Integer.MAX_VALUE));
-        }
-    }
-
     @VisibleForTesting
     public RpcServer getRpcServer() {
         return rpcServer;
diff --git a/website/docs/maintenance/configuration.md 
b/website/docs/maintenance/configuration.md
index 6304e439a..f73b4e543 100644
--- a/website/docs/maintenance/configuration.md
+++ b/website/docs/maintenance/configuration.md
@@ -35,7 +35,10 @@ during the Fluss cluster working.
 | `security.${protocol}.*`                            | String             | 
(none)                                                                          
                                                                                
         | Protocol-specific configuration properties. For example, 
security.sasl.jaas.config for SASL authentication settings.                     
                                                                                
                            [...]
 | default.bucket.number                               | Integer            | 1 
                                                                                
                                                                                
       | The default number of buckets for a table in Fluss cluster. It's a 
cluster-level parameter and all the tables without specifying bucket number in 
the cluster will use the value as the bucket number.                            
                   [...]
 | default.replication.factor                          | Integer            | 1 
                                                                                
                                                                                
       | The default replication factor for the log of a table in Fluss 
cluster. It's a cluster-level parameter, and all the tables without specifying 
replication factor in the cluster will use the value as replication factor.     
                       [...]
-| remote.data.dir                                     | String             | 
(None)                                                                          
                                                                                
         | The directory used for storing the kv snapshot data files and remote 
log for log tiered storage in a Fluss supported filesystem.                     
                                                                                
                [...]
+| remote.data.dir                                     | String             | 
(None)                                                                          
                                                                                
         | The directory used for storing the kv snapshot data files and remote 
log for log tiered storage in a Fluss supported filesystem. When upgrading to 
`remote.data.dirs`, please ensure this value is placed as the first entry in 
the new configuratio [...]
+| remote.data.dirs                                    | List&lt;String&gt; | 
(None)                                                                          
                                                                                
         | A comma-separated list of directories in Fluss supported filesystems 
for storing the kv snapshot data files and remote log files of 
tables/partitions. If configured, when a new table or a new partition is 
created, one of the directories from th [...]
+| remote.data.dirs.strategy                           | Enum               | 
ROUND_ROBIN                                                                     
                                                                                
         | The strategy for selecting the remote data directory from 
`remote.data.dirs`. The candidate strategies are: [ROUND_ROBIN, 
WEIGHTED_ROUND_ROBIN], the default strategy is ROUND_ROBIN.<br/>ROUND_ROBIN: 
this strategy employs a round-robin approach  [...]
+| remote.data.dirs.weights                            | List&lt;Integer&gt;| 
(None)                                                                          
                                                                                
         | The weights of the remote data directories. This is a list of 
weights corresponding to the `remote.data.dirs` in the same order. When 
`remote.data.dirs.strategy` is set to `WEIGHTED_ROUND_ROBIN`, this must be 
configured, and its size must be eq [...]
 | remote.fs.write-buffer-size                         | MemorySize         | 
4kb                                                                             
                                                                                
         | The default size of the write buffer for writing the local files to 
remote file systems.                                                            
                                                                                
                 [...]
 | plugin.classloader.parent-first-patterns.additional | List&lt;String&gt; | 
(None)                                                                          
                                                                                
         | A (semicolon-separated) list of patterns that specifies which 
classes should always be resolved through the plugin parent ClassLoader first. 
A pattern is a simple prefix that is checked against the fully qualified class 
name. These patterns are [...]
 | plugin.classloader.parent-first-patterns.default    | String             | 
java.,<br/>org.apache.fluss.,<br/>javax.annotation.,<br/>org.slf4j,<br/>org.apache.log4j,<br/>org.apache.logging,<br/>org.apache.commons.logging,<br/>ch.qos.logback
     | A (semicolon-separated) list of patterns that specifies which classes 
should always be resolved through the plugin parent ClassLoader first. A 
pattern is a simple prefix that is checked against the fully qualified class 
name. This setting shoul [...]

Reply via email to