This is an automated email from the ASF dual-hosted git repository.
yunhong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/fluss.git
The following commit(s) were added to refs/heads/main by this push:
new 6bde9c34f [log] Support dynamic configuration for
log.replica.min-in-sync-replicas-number (#2837)
6bde9c34f is described below
commit 6bde9c34f1c62509084f88db236bc65fcffd463d
Author: xiaozhou <[email protected]>
AuthorDate: Wed Mar 11 22:03:47 2026 +0800
[log] Support dynamic configuration for
log.replica.min-in-sync-replicas-number (#2837)
* [log] Support dynamic configuration for
log.replica.min-in-sync-replicas-number
* [log] SSupport dynamic configuration for
log.replica.min-in-sync-replicas-number
---
.../apache/fluss/server/DynamicServerConfig.java | 2 +
.../org/apache/fluss/server/replica/Replica.java | 15 ++---
.../fluss/server/replica/ReplicaManager.java | 48 +++++++++++++-
.../apache/fluss/server/tablet/TabletServer.java | 2 +
.../fluss/server/DynamicConfigChangeTest.java | 73 ++++++++++++++++++++++
.../fluss/server/replica/ReplicaTestBase.java | 2 +-
6 files changed, 132 insertions(+), 10 deletions(-)
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
index f3ae90496..ac9c2e401 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/DynamicServerConfig.java
@@ -44,6 +44,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
import static
org.apache.fluss.config.ConfigOptions.KV_SHARED_RATE_LIMITER_BYTES_PER_SEC;
import static org.apache.fluss.config.ConfigOptions.KV_SNAPSHOT_INTERVAL;
+import static
org.apache.fluss.config.ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER;
import static org.apache.fluss.utils.concurrent.LockUtils.inReadLock;
import static org.apache.fluss.utils.concurrent.LockUtils.inWriteLock;
@@ -61,6 +62,7 @@ class DynamicServerConfig {
new HashSet<>(
Arrays.asList(
DATALAKE_FORMAT.key(),
+ LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key(),
KV_SHARED_RATE_LIMITER_BYTES_PER_SEC.key(),
KV_SNAPSHOT_INTERVAL.key()));
private static final Set<String> ALLOWED_CONFIG_PREFIXES =
Collections.singleton("datalake.");
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
index f73bcdda2..dd83c58b1 100644
--- a/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
+++ b/fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java
@@ -123,6 +123,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.IntSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
@@ -154,7 +155,7 @@ public final class Replica {
/** A closeable registry to register all registered {@link Closeable}s. */
private final CloseableRegistry closeableRegistry;
- private final int minInSyncReplicas;
+ private final IntSupplier minInSyncReplicasSupplier;
private final ServerMetadataCache metadataCache;
private final FatalErrorHandler fatalErrorHandler;
private final BucketMetricGroup bucketMetricGroup;
@@ -212,7 +213,7 @@ public final class Replica {
LogManager logManager,
@Nullable KvManager kvManager,
long replicaMaxLagTime,
- int minInSyncReplicas,
+ IntSupplier minInSyncReplicasSupplier,
int localTabletServerId,
OffsetCheckpointFile.LazyOffsetCheckpoints
lazyHighWatermarkCheckpoint,
DelayedOperationManager<DelayedWrite<?>> delayedWriteManager,
@@ -231,7 +232,7 @@ public final class Replica {
this.kvManager = kvManager;
this.metadataCache = metadataCache;
this.replicaMaxLagTime = replicaMaxLagTime;
- this.minInSyncReplicas = minInSyncReplicas;
+ this.minInSyncReplicasSupplier = minInSyncReplicasSupplier;
this.localTabletServerId = localTabletServerId;
this.delayedWriteManager = delayedWriteManager;
this.delayedFetchLogManager = delayedFetchLogManager;
@@ -373,7 +374,7 @@ public final class Replica {
}
public boolean isAtMinIsr() {
- return isLeader() && isrState.isr().size() == minInSyncReplicas;
+ return isLeader() && isrState.isr().size() ==
minInSyncReplicasSupplier.getAsInt();
}
public BucketMetricGroup bucketMetrics() {
@@ -1365,7 +1366,7 @@ public final class Replica {
}
if (logTablet.getHighWatermark() >= requiredOffset) {
- if (minInSyncReplicas <= curMaximalIsr.size()) {
+ if (minInSyncReplicasSupplier.getAsInt() <=
curMaximalIsr.size()) {
return Tuple2.of(true, Errors.NONE);
} else {
return Tuple2.of(true,
Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND_EXCEPTION);
@@ -1902,7 +1903,7 @@ public final class Replica {
private void validateInSyncReplicaSize(int requiredAcks) {
int inSyncSize = isrState.isr().size();
- if (inSyncSize < minInSyncReplicas && requiredAcks == -1) {
+ if (inSyncSize < minInSyncReplicasSupplier.getAsInt() && requiredAcks
== -1) {
throw new NotEnoughReplicasException(
String.format(
"The size of the current ISR %s is insufficient to
satisfy "
@@ -1917,7 +1918,7 @@ public final class Replica {
}
public boolean isUnderMinIsr() {
- return isLeader() && isrState.isr().size() < minInSyncReplicas;
+ return isLeader() && isrState.isr().size() <
minInSyncReplicasSupplier.getAsInt();
}
private LogTablet localLogOrThrow(boolean requireLeader) {
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
index 3fc024209..c57f1c6fb 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/replica/ReplicaManager.java
@@ -21,6 +21,8 @@ import org.apache.fluss.annotation.VisibleForTesting;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.config.TableConfig;
+import org.apache.fluss.config.cluster.ServerReconfigurable;
+import org.apache.fluss.exception.ConfigException;
import org.apache.fluss.exception.FencedLeaderEpochException;
import org.apache.fluss.exception.InvalidColumnProjectionException;
import org.apache.fluss.exception.InvalidCoordinatorException;
@@ -147,7 +149,7 @@ import static
org.apache.fluss.utils.Preconditions.checkState;
import static org.apache.fluss.utils.concurrent.LockUtils.inLock;
/** A manager for replica. */
-public class ReplicaManager {
+public class ReplicaManager implements ServerReconfigurable {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicaManager.class);
public static final String HIGH_WATERMARK_CHECKPOINT_FILE_NAME =
"high-watermark-checkpoint";
@@ -194,6 +196,9 @@ public class ReplicaManager {
private final KvSnapshotResource kvSnapshotResource;
private final DefaultSnapshotContext kvSnapshotContext;
+ /** The minimum number of in-sync replicas required for writes with
acks=-1. */
+ private volatile int minInSyncReplicas;
+
// remote log manager for remote log storage.
private final RemoteLogManager remoteLogManager;
@@ -304,6 +309,7 @@ public class ReplicaManager {
this.userMetrics = userMetrics;
this.clock = clock;
this.ioExecutor = ioExecutor;
+ this.minInSyncReplicas =
conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER);
registerMetrics();
}
@@ -326,6 +332,44 @@ public class ReplicaManager {
return kvSnapshotContext;
}
+ public int getMinInSyncReplicas() {
+ return minInSyncReplicas;
+ }
+
+ // ============ ServerReconfigurable Implementation ============
+
+ @Override
+ public void validate(Configuration newConfig) throws ConfigException {
+ // Type validation is already handled by DynamicServerConfig.
+ // Here we only do basic sanity checks.
+ int newMinInSyncReplicas =
+
newConfig.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER);
+ if (newMinInSyncReplicas <= 0) {
+ throw new ConfigException(
+ String.format(
+ "Invalid log.replica.min-in-sync-replicas-number
can not be "
+ + "negative or zero: %d",
+ newMinInSyncReplicas));
+ }
+ }
+
+ @Override
+ public void reconfigure(Configuration newConfig) {
+ int newMinInSyncReplicas =
+
newConfig.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER);
+ if (newMinInSyncReplicas == minInSyncReplicas) {
+ LOG.debug(
+ "log.replica.min-in-sync-replicas-number unchanged: {}",
newMinInSyncReplicas);
+ return;
+ }
+ int oldMinInSyncReplicas = minInSyncReplicas;
+ minInSyncReplicas = newMinInSyncReplicas;
+ LOG.info(
+ "log.replica.min-in-sync-replicas-number reconfigured: {} ->
{}",
+ oldMinInSyncReplicas,
+ newMinInSyncReplicas);
+ }
+
private void registerMetrics() {
serverMetricGroup.gauge(
MetricNames.REPLICA_LEADER_COUNT,
@@ -1865,7 +1909,7 @@ public class ReplicaManager {
logManager,
isKvTable ? kvManager : null,
conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis(),
-
conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER),
+ this::getMinInSyncReplicas,
serverId,
new OffsetCheckpointFile.LazyOffsetCheckpoints(
highWatermarkCheckpoint),
diff --git
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
index 93e8951b7..65ab96324 100644
---
a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
+++
b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletServer.java
@@ -275,6 +275,8 @@ public class TabletServer extends ServerBase {
// Register DefaultSnapshotContext for dynamic kv.snapshot.interval
dynamicConfigManager.register(replicaManager.getKvSnapshotContext());
+ // Register replicaManager to dynamicConfigManager for dynamic
config
+ dynamicConfigManager.register(replicaManager);
// Start dynamicConfigManager after all reconfigurable components
are registered
dynamicConfigManager.startup();
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
index aa55372f7..92894ab7f 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/DynamicConfigChangeTest.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.fluss.config.ConfigOptions.DATALAKE_FORMAT;
@@ -412,4 +413,76 @@ public class DynamicConfigChangeTest {
// Verify the reconfigurable was notified with the new value
assertThat(reconfiguredInterval.get()).isEqualTo(Duration.ofMinutes(5));
}
+
+ @Test
+ void testPreventInvalidMinInSyncReplicas() throws Exception {
+ Configuration configuration = new Configuration();
+
configuration.setInt(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER, 1);
+
+ DynamicConfigManager dynamicConfigManager =
+ new DynamicConfigManager(zookeeperClient, configuration, true);
+ dynamicConfigManager.startup();
+
+ // Try to set min-in-sync-replicas to an invalid value - should be
rejected by type
+ // validation
+ assertThatThrownBy(
+ () ->
+ dynamicConfigManager.alterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ ConfigOptions
+
.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER
+ .key(),
+ "invalid_value",
+
AlterConfigOpType.SET))))
+ .isInstanceOf(ConfigException.class)
+ .hasMessageContaining(
+ "Cannot parse 'invalid_value' as Integer for config"
+ + "
'log.replica.min-in-sync-replicas-number'");
+ }
+
+ @Test
+ void testDynamicMinInSyncReplicasChange() throws Exception {
+ Configuration configuration = new Configuration();
+
configuration.setInt(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER, 1);
+
+ DynamicConfigManager dynamicConfigManager =
+ new DynamicConfigManager(zookeeperClient, configuration, true);
+
+ AtomicInteger reconfiguredValue = new AtomicInteger();
+ dynamicConfigManager.register(
+ new ServerReconfigurable() {
+ @Override
+ public void validate(Configuration newConfig) throws
ConfigException {}
+
+ @Override
+ public void reconfigure(Configuration newConfig) {
+ reconfiguredValue.set(
+ newConfig.get(
+
ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER));
+ }
+ });
+ dynamicConfigManager.startup();
+
+ // Change min-in-sync-replicas to 2 - should succeed
+ assertThatCode(
+ () ->
+ dynamicConfigManager.alterConfigs(
+ Collections.singletonList(
+ new AlterConfig(
+ ConfigOptions
+
.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER
+ .key(),
+ "2",
+
AlterConfigOpType.SET))))
+ .doesNotThrowAnyException();
+
+ // Verify config was persisted to ZK
+ Map<String, String> zkConfig = zookeeperClient.fetchEntityConfig();
+
assertThat(zkConfig.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER.key()))
+ .isEqualTo("2");
+
+ // Verify the reconfigurable was notified with the new value
+ assertThat(reconfiguredValue.get()).isEqualTo(2);
+ }
}
diff --git
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
index 3fb01a7e2..2df8b8163 100644
---
a/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
+++
b/fluss-server/src/test/java/org/apache/fluss/server/replica/ReplicaTestBase.java
@@ -476,7 +476,7 @@ public class ReplicaTestBase {
logManager,
isPkTable ? kvManager : null,
conf.get(ConfigOptions.LOG_REPLICA_MAX_LAG_TIME).toMillis(),
-
conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER),
+ () ->
conf.get(ConfigOptions.LOG_REPLICA_MIN_IN_SYNC_REPLICAS_NUMBER),
TABLET_SERVER_ID,
new OffsetCheckpointFile.LazyOffsetCheckpoints(
new OffsetCheckpointFile(