This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 25a8ae0713c KAFKA-17266: Add dynamic broker config to enable follower 
fetch using tiered offset (#16834)
25a8ae0713c is described below

commit 25a8ae0713c19722422ad07bb9af3c0538f18bb9
Author: Abhijeet Kumar <[email protected]>
AuthorDate: Fri Dec 12 23:26:12 2025 +0530

    KAFKA-17266: Add dynamic broker config to enable follower fetch using 
tiered offset (#16834)
    
    In this PR, we add a dynamic broker config to enable/disable follower
    fetch using tiered offset. This is part of the implementation for
    
    
[KIP-1023](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1023%3A+Follower+fetch+from+tiered+offset)
    
    Added tests for the new dynamic broker config.
    
    Reviewers: Kamal Chandraprakash <[email protected]>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 24 ++++++++++++++++++++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  1 +
 .../kafka/server/DynamicBrokerConfigTest.scala     | 25 ++++++++++++++++++++++
 .../kafka/server/config/ReplicationConfigs.java    |  7 +++++-
 4 files changed, 55 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 51d7dbe5416..d580eda7697 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.KafkaRaftClient
 import org.apache.kafka.server.{DynamicThreadPool, ProcessRole}
 import org.apache.kafka.server.common.ApiMessageAndVersion
-import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, 
ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
+import org.apache.kafka.server.config.{DynamicProducerStateManagerConfig, 
ReplicationConfigs, ServerConfigs, ServerLogConfigs, ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientTelemetryExporterPlugin, 
MetricConfigs}
 import org.apache.kafka.server.telemetry.{ClientTelemetry, 
ClientTelemetryExporterProvider}
@@ -101,6 +101,7 @@ object DynamicBrokerConfig {
     SocketServer.ReconfigurableConfigs ++
     DynamicProducerStateManagerConfig ++
     DynamicRemoteLogConfig.ReconfigurableConfigs ++
+    DynamicReplicationConfig.ReconfigurableConfigs ++
     Set(AbstractConfig.CONFIG_PROVIDERS_CONFIG) ++
     GroupCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala ++
     ShareCoordinatorConfig.RECONFIGURABLE_CONFIGS.asScala
@@ -311,6 +312,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     addBrokerReconfigurable(kafkaServer.socketServer)
     addBrokerReconfigurable(new 
DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
     addBrokerReconfigurable(new DynamicRemoteLogConfig(kafkaServer))
+    addBrokerReconfigurable(new DynamicReplicationConfig(kafkaServer))
   }
 
   /**
@@ -1134,3 +1136,23 @@ object DynamicRemoteLogConfig {
     RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP
   )
 }
+
+class DynamicReplicationConfig(server: KafkaBroker) extends 
BrokerReconfigurable with Logging {
+  override def reconfigurableConfigs: Set[String] = {
+    DynamicReplicationConfig.ReconfigurableConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    // Currently it is a noop for reconfiguring the dynamic config 
follower.fetch.last.tiered.offset.enable
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+    // Currently it is a noop for reconfiguring the dynamic config 
follower.fetch.last.tiered.offset.enable
+  }
+}
+
+object DynamicReplicationConfig {
+  val ReconfigurableConfigs = Set(
+    ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG
+  )
+}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index a7317a20e89..a96183e1981 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -356,6 +356,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   val leaderImbalanceCheckIntervalSeconds: Long = 
getLong(ReplicationConfigs.LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS_CONFIG)
   val uncleanLeaderElectionCheckIntervalMs: Long = 
getLong(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_INTERVAL_MS_CONFIG)
   def uncleanLeaderElectionEnable: java.lang.Boolean = 
getBoolean(ReplicationConfigs.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG)
+  def followerFetchLastTieredOffsetEnable: java.lang.Boolean = 
getBoolean(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG);
 
   /** ********* Controlled shutdown configuration ***********/
   val controlledShutdownEnable = 
getBoolean(ServerConfigs.CONTROLLED_SHUTDOWN_ENABLE_CONFIG)
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 8504f3705ac..8c162476461 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -988,6 +988,31 @@ class DynamicBrokerConfigTest {
     verifyNoMoreInteractions(remoteLogManager)
   }
 
+  @Test
+  def testEnableFollowerFetchLastTieredOffset(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, port = 9092)
+    val config = KafkaConfig.fromProps(props)
+    val serverMock: KafkaBroker = mock(classOf[KafkaBroker])
+
+    Mockito.when(serverMock.config).thenReturn(config)
+
+    config.dynamicConfig.initialize(None)
+    config.dynamicConfig.addBrokerReconfigurable(new 
DynamicReplicationConfig(serverMock))
+
+    
assertEquals(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DEFAULT,
+      config.followerFetchLastTieredOffsetEnable)
+
+    // Update default config
+    
props.put(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG, 
"true")
+    config.dynamicConfig.updateDefaultConfig(props)
+    assertTrue(config.followerFetchLastTieredOffsetEnable)
+
+    // Update per broker config
+    
props.put(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG, 
"false")
+    config.dynamicConfig.updateBrokerConfig(0, props)
+    assertFalse(config.followerFetchLastTieredOffsetEnable)
+  }
+
   def verifyIncorrectLogLocalRetentionProps(logLocalRetentionMs: Long,
                                             retentionMs: Long,
                                             logLocalRetentionBytes: Long,
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java 
b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
index e437f6d4a76..d39a141a458 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/ReplicationConfigs.java
@@ -132,6 +132,10 @@ public class ReplicationConfigs {
     public static final String REPLICA_SELECTOR_CLASS_CONFIG = 
"replica.selector.class";
     public static final String REPLICA_SELECTOR_CLASS_DOC = "The fully 
qualified class name that implements ReplicaSelector. This is used by the 
broker to find the preferred read replica. By default, we use an implementation 
that returns the leader.";
 
+    public static final String FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG 
= "follower.fetch.last.tiered.offset.enable";
+    public static final String FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DOC = 
"When enabled, an empty follower skips offsets up to the last tiered offset and 
begins replication from the next offset.";
+    public static final boolean 
FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DEFAULT = false;
+
     public static final String AUTO_LEADER_REBALANCE_ENABLE_CONFIG = 
"auto.leader.rebalance.enable";
     public static final boolean AUTO_LEADER_REBALANCE_ENABLE_DEFAULT = true;
     public static final String AUTO_LEADER_REBALANCE_ENABLE_DOC = 
String.format("Enables auto leader balancing. A background thread checks the 
distribution of partition leaders at regular intervals, configurable by %s. If 
the leader is imbalanced, leader rebalance to the preferred leader for 
partitions is triggered.",
@@ -158,6 +162,7 @@ public class ReplicationConfigs {
             .define(UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, BOOLEAN, 
LogConfig.DEFAULT_UNCLEAN_LEADER_ELECTION_ENABLE, HIGH, 
UNCLEAN_LEADER_ELECTION_ENABLE_DOC)
             .define(INTER_BROKER_SECURITY_PROTOCOL_CONFIG, STRING, 
INTER_BROKER_SECURITY_PROTOCOL_DEFAULT, 
ConfigDef.ValidString.in(Utils.enumOptions(SecurityProtocol.class)), MEDIUM, 
INTER_BROKER_SECURITY_PROTOCOL_DOC)
             .define(INTER_BROKER_LISTENER_NAME_CONFIG, STRING, null, MEDIUM, 
INTER_BROKER_LISTENER_NAME_DOC)
-            .define(REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM, 
REPLICA_SELECTOR_CLASS_DOC);
+            .define(REPLICA_SELECTOR_CLASS_CONFIG, STRING, null, MEDIUM, 
REPLICA_SELECTOR_CLASS_DOC)
+            .define(FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG, BOOLEAN, 
FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DEFAULT, MEDIUM, 
FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_DOC);
 
 }

Reply via email to