This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 25a8ae0713c KAFKA-17266: Add dynamic broker config to enable follower
fetch using tiered offset (#16834)
25a8ae0713c is described below
commit 25a8ae0713c19722422ad07bb9af3c0538f18bb9
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Fri Dec 12 23:26:12 2025 +0530
KAFKA-17266: Add dynamic broker config to enable follower fetch using
tiered offset (#16834)
In this PR, we add a dynamic broker config to enable/disable follower
fetch using tiered offset. This is part of the implementation for
[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)
Added tests for the new dynamic broker config.
Reviewers: Kamal Chandraprakash <[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 24 ++++++++++++++++++++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 1 +
.../kafka/server/DynamicBrokerConfigTest.scala | 25 ++++++++++++++++++++++
.../kafka/server/config/ReplicationConfigs.java | 7 +++++-
4 files changed, 55 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 51d7dbe5416..d580eda7697 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.KafkaRaftClient
import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig,
ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig,
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin,
MetricConfigs}
import org.apache.kafka.server.telemetry.{ClientTelemetry,
ClientTelemetryExporterProvider}
@@ -101,6 +101,7 @@ object DynamicBrokerConfig {
SocketServer.ReconfigurableConfigs ++
DynamicProducerStateManagerConfig ++
DynamicRemoteLogConfig.ReconfigurableConfigs ++
+ DynamicReplicationConfig.ReconfigurableConfigs ++
Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG) ++
GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++
ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala
@@ -311,6 +312,7 @@ class DynamicBrokerConfig(private val kafkaConfig:
KafkaConfig) extends Logging
addBrokerReconfigurable(kafkaServer.socketServer)
addBrokerReconfigurable(new
DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
addBrokerReconfigurable(new DynamicRemoteLogConfig(kafkaServer))
+ addBrokerReconfigurable(new DynamicReplicationConfig(kafkaServer))
}
/**
@@ -1134,3 +1136,23 @@ object DynamicRemoteLogConfig {
RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
)
}
+
+class DynamicReplicationConfig(server: KafkaBroker) extends
BrokerReconfigurable with Logging {
+ override def reconfigurableConfigs: Set[String] = {
+ DynamicReplicationConfig.ReconfigurableConfigs
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ // Currently it is a noop for reconfiguring the dynamic config
follower.fetch.last.tiered.offset.enable
+ }
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
+ // Currently it is a noop for reconfiguring the dynamic config
follower.fetch.last.tiered.offset.enable
+ }
+}
+
+object DynamicReplicationConfig {
+ val ReconfigurableConfigs = Set(
+ ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG
+ )
+}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a7317a20e89..a96183e1981 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -356,6 +356,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
val leaderImbalanceCheckIntervalSeconds: Long =
getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG)
val uncleanLeaderElectionCheckIntervalMs: Long =
getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG)
def uncleanLeaderElectionEnable: java.lang.Boolean =
getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)
+ def followerFetchLastTieredOffsetEnable: java.lang.Boolean =
getBoolean(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG);
/** ********* Controlled shutdown configuration ***********/
val controlledShutdownEnable =
getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 8504f3705ac..8c162476461 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -988,6 +988,31 @@ class DynamicBrokerConfigTest {
verifyNoMoreInteractions(remoteLogManager)
}
+ @Test
+ def testEnableFollowerFetchLastTieredOffset(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, port = 9092)
+ val config = KafkaConfig.fromProps(props)
+ val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
+
+ Mockito.when(serverMock.config).thenReturn(config)
+
+ config.dynamicConfig.initialize(None)
+ config.dynamicConfig.addBrokerReconfigurable(new
DynamicReplicationConfig(serverMock))
+
+
assertEquals(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DEFAULT,
+ config.followerFetchLastTieredOffsetEnable)
+
+ // Update default config
+
props.put(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG,
"true")
+ config.dynamicConfig.updateDefaultConfig(props)
+ assertTrue(config.followerFetchLastTieredOffsetEnable)
+
+ // Update per broker config
+
props.put(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG,
"false")
+ config.dynamicConfig.updateBrokerConfig(0, props)
+ assertFalse(config.followerFetchLastTieredOffsetEnable)
+ }
+
def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
retentionMs: Long,
logLocalRetentionBytes: Long,
diff --git
a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
index e437f6d4a76..d39a141a458 100644
---
a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
+++
b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
@@ -132,6 +132,10 @@ public class ReplicationConfigs {
public static final String REPLICA_SELECTOR_CLASS_CONFIG =
"replica.selector.class";
public static final String REPLICA_SELECTOR_CLASS_DOC = "The fully
qualified class name that implements ReplicaSelector. This is used by the
broker to find the preferred read replica. By default, we use an implementation
that returns the leader.";
+ public static final String FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG
= "follower.fetch.last.tiered.offset.enable";
+ public static final String FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DOC =
"When enabled, an empty follower skips offsets up to the last tiered offset and
begins replication from the next offset.";
+ public static final boolean
FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DEFAULT = false;
+
public static final String AUTO_LEADER_REBALANCE_ENABLE_CONFIG =
"auto.leader.rebalance.enable";
public static final boolean AUTO_LEADER_REBALANCE_ENABLE_DEFAULT = true;
public static final String AUTO_LEADER_REBALANCE_ENABLE_DOC =
String.format("Enables auto leader balancing. A background thread checks the
distribution of partition leaders at regular intervals, configurable by %s. If
the leader is imbalanced, leader rebalance to the preferred leader for
partitions is triggered.",
@@ -158,6 +162,7 @@ public class ReplicationConfigs {
.define(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN,
LogConfig.DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, HIGH,
UNCLEAN_LEADER_ELECTION_ENABLE_DOC)
.define(INTER_BROKER_SECURITY_PROTOCOL_CONFIG, STRING,
INTER_BROKER_SECURITY_PROTOCOL_DEFAULT,
ConfigDef.ValidString.in(Utils.enumOptions(SecurityProtocol.class)), MEDIUM,
INTER_BROKER_SECURITY_PROTOCOL_DOC)
.define(INTER_BROKER_LISTENER_NAME_CONFIG, STRING, null, MEDIUM,
INTER_BROKER_LISTENER_NAME_DOC)
- .define(REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM,
REPLICA_SELECTOR_CLASS_DOC);
+ .define(REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM,
REPLICA_SELECTOR_CLASS_DOC)
+ .define(FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG, BOOLEAN,
FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DEFAULT, MEDIUM,
FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DOC);
}