This is an automated email from the ASF dual-hosted git repository.

yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git


The following commit(s) were added to refs/heads/main by this push:
     new 6bde9c34f [log] Support dynamic configuration for 
log.replica.min-in-sync-replicas-number (#2837)
6bde9c34f is described below

commit 6bde9c34f1c62509084f88db236bc65fcffd463d
Author: xiaozhou <[email protected]>
AuthorDate: Wed Mar 11 22:03:47 2026 +0800

    [log] Support dynamic configuration for 
log.replica.min-in-sync-replicas-number (#2837)
    
    * [log] Support dynamic configuration for 
log.replica.min-in-sync-replicas-number
    
    * [log] SSupport dynamic configuration for 
log.replica.min-in-sync-replicas-number
---
 .../apache/fluss/server/DynamicServerConfig.java   |  2 +
 .../org/apache/fluss/server/replica/Replica.java   | 15 ++---
 .../fluss/server/replica/ReplicaManager.java       | 48 +++++++++++++-
 .../apache/fluss/server/tablet/TabletServer.java   |  2 +
 .../fluss/server/DynamicConfigChangeTest.java      | 73 ++++++++++++++++++++++
 .../fluss/server/replica/ReplicaTestBase.java      |  2 +-
 6 files changed, 132 insertions(+), 10 deletions(-)

diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
index f3ae90496..ac9c2e401 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
 import static 
org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
 import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL;
+import static 
org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER;
 import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
 import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
 
@@ -61,6 +62,7 @@ class DynamicServerConfig {
             new HashSet<>(
                     Arrays.asList(
                             DATALAKE_FORMAT.key(),
+                            LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(),
                             KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
                             KV_SNAPSHOT_INTERVAL.key()));
     private static final Set<String> ALLOWED_CONFIG_PREFIXES = 
Collections.singleton("datalake.");
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index f73bcdda2..dd83c58b1 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -123,6 +123,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.IntSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
@@ -154,7 +155,7 @@ public final class Replica {
     /** A closeable registry to register all registered {@link Closeable}s. */
     private final CloseableRegistry closeableRegistry;
 
-    private final int minInSyncReplicas;
+    private final IntSupplier minInSyncReplicasSupplier;
     private final ServerMetadataCache metadataCache;
     private final FatalErrorHandler fatalErrorHandler;
     private final BucketMetricGroup bucketMetricGroup;
@@ -212,7 +213,7 @@ public final class Replica {
             LogManager logManager,
             @Nullable KvManager kvManager,
             long replicaMaxLagTime,
-            int minInSyncReplicas,
+            IntSupplier minInSyncReplicasSupplier,
             int localTabletServerId,
             OffsetCheckpointFile.LazyOffsetCheckpoints 
lazyHighWatermarkCheckpoint,
             DelayedOperationManager<DelayedWrite<?>> delayedWriteManager,
@@ -231,7 +232,7 @@ public final class Replica {
         this.kvManager = kvManager;
         this.metadataCache = metadataCache;
         this.replicaMaxLagTime = replicaMaxLagTime;
-        this.minInSyncReplicas = minInSyncReplicas;
+        this.minInSyncReplicasSupplier = minInSyncReplicasSupplier;
         this.localTabletServerId = localTabletServerId;
         this.delayedWriteManager = delayedWriteManager;
         this.delayedFetchLogManager = delayedFetchLogManager;
@@ -373,7 +374,7 @@ public final class Replica {
     }
 
     public boolean isAtMinIsr() {
-        return isLeader() && isrState.isr().size() == minInSyncReplicas;
+        return isLeader() && isrState.isr().size() == 
minInSyncReplicasSupplier.getAsInt();
     }
 
     public BucketMetricGroup bucketMetrics() {
@@ -1365,7 +1366,7 @@ public final class Replica {
             }
 
             if (logTablet.getHighWatermark() >= requiredOffset) {
-                if (minInSyncReplicas <= curMaximalIsr.size()) {
+                if (minInSyncReplicasSupplier.getAsInt() <= 
curMaximalIsr.size()) {
                     return Tuple2.of(true, Errors.NONE);
                 } else {
                     return Tuple2.of(true, 
Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION);
@@ -1902,7 +1903,7 @@ public final class Replica {
 
     private void validateInSyncReplicaSize(int requiredAcks) {
         int inSyncSize = isrState.isr().size();
-        if (inSyncSize < minInSyncReplicas && requiredAcks == -1) {
+        if (inSyncSize < minInSyncReplicasSupplier.getAsInt() && requiredAcks 
== -1) {
             throw new NotEnoughReplicasException(
                     String.format(
                             "The size of the current ISR %s is insufficient to 
satisfy "
@@ -1917,7 +1918,7 @@ public final class Replica {
     }
 
     public boolean isUnderMinIsr() {
-        return isLeader() && isrState.isr().size() < minInSyncReplicas;
+        return isLeader() && isrState.isr().size() < 
minInSyncReplicasSupplier.getAsInt();
     }
 
     private LogTablet localLogOrThrow(boolean requireLeader) {
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 3fc024209..c57f1c6fb 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -21,6 +21,8 @@ import org.apache.fluss.annotation.VisibleForTesting;
 import org.apache.fluss.config.ConfigOptions;
 import org.apache.fluss.config.Configuration;
 import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.config.cluster.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
 import org.apache.fluss.exception.FencedLeaderEpochException;
 import org.apache.fluss.exception.InvalidColumnProjectionException;
 import org.apache.fluss.exception.InvalidCoordinatorException;
@@ -147,7 +149,7 @@ import static 
org.apache.fluss.utils.Preconditions.checkState;
 import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
 
 /** A manager for replica. */
-public class ReplicaManager {
+public class ReplicaManager implements ServerReconfigurable {
     private static final Logger LOG = 
LoggerFactory.getLogger(ReplicaManager.class);
 
     public static final String HIGH_WATERMARK_CHECKPOINT_FILE_NAME = 
"high-watermark-checkpoint";
@@ -194,6 +196,9 @@ public class ReplicaManager {
     private final KvSnapshotResource kvSnapshotResource;
     private final DefaultSnapshotContext kvSnapshotContext;
 
+    /** The minimum number of in-sync replicas required for writes with 
acks=-1. */
+    private volatile int minInSyncReplicas;
+
     // remote log manager for remote log storage.
     private final RemoteLogManager remoteLogManager;
 
@@ -304,6 +309,7 @@ public class ReplicaManager {
         this.userMetrics = userMetrics;
         this.clock = clock;
         this.ioExecutor = ioExecutor;
+        this.minInSyncReplicas = 
conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER);
         registerMetrics();
     }
 
@@ -326,6 +332,44 @@ public class ReplicaManager {
         return kvSnapshotContext;
     }
 
+    public int getMinInSyncReplicas() {
+        return minInSyncReplicas;
+    }
+
+    // ============ ServerReconfigurable Implementation ============
+
+    @Override
+    public void validate(Configuration newConfig) throws ConfigException {
+        // Type validation is already handled by DynamicServerConfig.
+        // Here we only do basic sanity checks.
+        int newMinInSyncReplicas =
+                
newConfig.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER);
+        if (newMinInSyncReplicas <= 0) {
+            throw new ConfigException(
+                    String.format(
+                            "Invalid log.replica.min-in-sync-replicas-number 
can not be "
+                                    + "negative or zero: %d",
+                            newMinInSyncReplicas));
+        }
+    }
+
+    @Override
+    public void reconfigure(Configuration newConfig) {
+        int newMinInSyncReplicas =
+                
newConfig.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER);
+        if (newMinInSyncReplicas == minInSyncReplicas) {
+            LOG.debug(
+                    "log.replica.min-in-sync-replicas-number unchanged: {}", 
newMinInSyncReplicas);
+            return;
+        }
+        int oldMinInSyncReplicas = minInSyncReplicas;
+        minInSyncReplicas = newMinInSyncReplicas;
+        LOG.info(
+                "log.replica.min-in-sync-replicas-number reconfigured: {} -> 
{}",
+                oldMinInSyncReplicas,
+                newMinInSyncReplicas);
+    }
+
     private void registerMetrics() {
         serverMetricGroup.gauge(
                 MetricNames.REPLICA_LEADER_COUNT,
@@ -1865,7 +1909,7 @@ public class ReplicaManager {
                                 logManager,
                                 isKvTable ? kvManager : null,
                                 
conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis(),
-                                
conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER),
+                                this::getMinInSyncReplicas,
                                 serverId,
                                 new OffsetCheckpointFile.LazyOffsetCheckpoints(
                                         highWatermarkCheckpoint),
diff --git 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 93e8951b7..65ab96324 100644
--- 
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++ 
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -275,6 +275,8 @@ public class TabletServer extends ServerBase {
 
             // Register DefaultSnapshotContext for dynamic kv.snapshot.interval
             
dynamicConfigManager.register(replicaManager.getKvSnapshotContext());
+            // Register replicaManager to dynamicConfigManager for dynamic 
config
+            dynamicConfigManager.register(replicaManager);
             // Start dynamicConfigManager after all reconfigurable components 
are registered
             dynamicConfigManager.startup();
 
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
index aa55372f7..92894ab7f 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
@@ -412,4 +413,76 @@ public class DynamicConfigChangeTest {
         // Verify the reconfigurable was notified with the new value
         
assertThat(reconfiguredInterval.get()).isEqualTo(Duration.ofMinutes(5));
     }
+
+    @Test
+    void testPreventInvalidMinInSyncReplicas() throws Exception {
+        Configuration configuration = new Configuration();
+        
configuration.setInt(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER, 1);
+
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, configuration, true);
+        dynamicConfigManager.startup();
+
+        // Try to set min-in-sync-replicas to an invalid value - should be 
rejected by type
+        // validation
+        assertThatThrownBy(
+                        () ->
+                                dynamicConfigManager.alterConfigs(
+                                        Collections.singletonList(
+                                                new AlterConfig(
+                                                        ConfigOptions
+                                                                
.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER
+                                                                .key(),
+                                                        "invalid_value",
+                                                        
AlterConfigOpType.SET))))
+                .isInstanceOf(ConfigException.class)
+                .hasMessageContaining(
+                        "Cannot parse 'invalid_value' as Integer for config"
+                                + " 
'log.replica.min-in-sync-replicas-number'");
+    }
+
+    @Test
+    void testDynamicMinInSyncReplicasChange() throws Exception {
+        Configuration configuration = new Configuration();
+        
configuration.setInt(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER, 1);
+
+        DynamicConfigManager dynamicConfigManager =
+                new DynamicConfigManager(zookeeperClient, configuration, true);
+
+        AtomicInteger reconfiguredValue = new AtomicInteger();
+        dynamicConfigManager.register(
+                new ServerReconfigurable() {
+                    @Override
+                    public void validate(Configuration newConfig) throws 
ConfigException {}
+
+                    @Override
+                    public void reconfigure(Configuration newConfig) {
+                        reconfiguredValue.set(
+                                newConfig.get(
+                                        
ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER));
+                    }
+                });
+        dynamicConfigManager.startup();
+
+        // Change min-in-sync-replicas to 2 - should succeed
+        assertThatCode(
+                        () ->
+                                dynamicConfigManager.alterConfigs(
+                                        Collections.singletonList(
+                                                new AlterConfig(
+                                                        ConfigOptions
+                                                                
.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER
+                                                                .key(),
+                                                        "2",
+                                                        
AlterConfigOpType.SET))))
+                .doesNotThrowAnyException();
+
+        // Verify config was persisted to ZK
+        Map<String, String> zkConfig = zookeeperClient.fetchEntityConfig();
+        
assertThat(zkConfig.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key()))
+                .isEqualTo("2");
+
+        // Verify the reconfigurable was notified with the new value
+        assertThat(reconfiguredValue.get()).isEqualTo(2);
+    }
 }
diff --git 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 3fb01a7e2..2df8b8163 100644
--- 
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++ 
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -476,7 +476,7 @@ public class ReplicaTestBase {
                 logManager,
                 isPkTable ? kvManager : null,
                 conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis(),
-                
conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER),
+                () -> 
conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER),
                 TABLET_SERVER_ID,
                 new OffsetCheckpointFile.LazyOffsetCheckpoints(
                         new OffsetCheckpointFile(

Reply via email to