This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c797f85de48 KAFKA-19642 Replace dynamicPerBrokerConfigs with 
dynamicDefaultConfigs (#20405)
c797f85de48 is described below

commit c797f85de481ad3e6840518665d00e1f9c72111f
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Wed Aug 27 08:34:58 2025 +0200

    KAFKA-19642 Replace dynamicPerBrokerConfigs with dynamicDefaultConfigs 
(#20405)
    
    - **Changes**: Replace misused dynamicPerBrokerConfigs with
    dynamicDefaultConfigs
    - **Reasons**: KRaft servers don't handle the cluser-level configs in
    starting
    
    from: https://github.com/apache/kafka/pull/18949/files#r2296809389
    
    Reviewers: Jun Rao <[email protected]>, Jhen-Yung Hsu
    <[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai
    <[email protected]>
    
    ---------
    
    Co-authored-by: PoAn Yang <[email protected]>
---
 .../scala/kafka/server/DynamicBrokerConfig.scala   |  2 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 79 ++++++++++++++++++++++
 2 files changed, 80 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index c64218246b3..124a4c7b78f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -239,7 +239,7 @@ object DynamicBrokerConfig {
             }
           }
           val configHandler = new BrokerConfigHandler(config, quotaManagers)
-          configHandler.processConfigChanges("", dynamicPerBrokerConfigs)
+          configHandler.processConfigChanges("", dynamicDefaultConfigs)
           configHandler.processConfigChanges(config.brokerId.toString, 
dynamicPerBrokerConfigs)
         }
       }
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 75447e04df3..170ee3679f4 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1100,9 +1100,13 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def 
testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs(groupProtocol:
 String): Unit = {
+    TestNumReplicaFetcherMetricsReporter.testReporters.clear()
+
     // modify snapshot interval config to explicitly take snapshot on a broker 
with valid dynamic configs
     val props = defaultStaticConfig(numServers)
     props.put(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG, 
"10000")
+    props.put(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG, 
classOf[TestNumReplicaFetcherMetricsReporter].getName)
+    props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
 
     val kafkaConfig = KafkaConfig.fromProps(props)
     val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]
@@ -1110,6 +1114,15 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 
     alterSslKeystoreUsingConfigCommand(sslProperties1, 
listenerPrefix(SecureExternal))
 
+    // Add num.replica.fetchers to the cluster-level config.
+    val clusterLevelProps = new Properties
+    clusterLevelProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2")
+    reconfigureServers(clusterLevelProps, perBrokerConfig = false, 
(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2"))
+
+    // Wait for the metrics reporter to be configured
+    val initialReporter = 
TestNumReplicaFetcherMetricsReporter.waitForReporters(1).head
+    initialReporter.verifyState(reconfigureCount = 1, numFetcher = 2)
+
     TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)
 
     TestUtils.waitUntilTrue(
@@ -1122,11 +1135,19 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
     newBroker.shutdown()
     newBroker.awaitShutdown()
 
+    // Clean up the test reporter
+    TestNumReplicaFetcherMetricsReporter.testReporters.clear()
+
     val invalidStaticConfigs = defaultStaticConfig(newBroker.config.brokerId)
     invalidStaticConfigs.putAll(securityProps(invalidSslConfigs, 
KEYSTORE_PROPS, listenerPrefix(SecureExternal)))
     
newBroker.config.updateCurrentConfig(KafkaConfig.fromProps(invalidStaticConfigs))
 
     newBroker.startup()
+
+    // Verify that the custom MetricsReporter is not reconfigured after 
restart.
+    // If readDynamicBrokerConfigsFromSnapshot works correctly, the reporter 
should maintain its state.
+    val reporterAfterRestart = 
TestNumReplicaFetcherMetricsReporter.waitForReporters(1).head
+    reporterAfterRestart.verifyState(reconfigureCount = 0, numFetcher = 2)
   }
 
   private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = {
@@ -1635,6 +1656,64 @@ class TestMetricsReporter extends MetricsReporter with 
Reconfigurable with Close
   }
 }
 
+object TestNumReplicaFetcherMetricsReporter {
+  val testReporters = new 
ConcurrentLinkedQueue[TestNumReplicaFetcherMetricsReporter]()
+
+  def waitForReporters(count: Int): List[TestNumReplicaFetcherMetricsReporter] 
= {
+    TestUtils.waitUntilTrue(() => testReporters.size == count, msg = "Metrics 
reporters size not matched. Expected: " + count + ", actual: " + 
testReporters.size())
+
+    val reporters = testReporters.asScala.toList
+    TestUtils.waitUntilTrue(() => reporters.forall(_.configureCount == 1), msg 
= "Metrics reporters not configured")
+    reporters
+  }
+}
+
+
+class TestNumReplicaFetcherMetricsReporter extends MetricsReporter {
+  import TestNumReplicaFetcherMetricsReporter._
+  @volatile var configureCount = 0
+  @volatile var reconfigureCount = 0
+  @volatile var numFetchers: Int = 1
+  testReporters.add(this)
+
+  override def init(metrics: util.List[KafkaMetric]): Unit = {
+  }
+
+  override def configure(configs: util.Map[String, _]): Unit = {
+    configureCount += 1
+    numFetchers = 
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
+  }
+
+  override def metricChange(metric: KafkaMetric): Unit = {
+  }
+
+  override def metricRemoval(metric: KafkaMetric): Unit = {
+  }
+
+  override def reconfigurableConfigs(): util.Set[String] = {
+    util.Set.of(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
+  }
+
+  override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+    val numFetchers = 
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
+    if (numFetchers <= 0)
+      throw new ConfigException(s"Invalid num.replica.fetchers $numFetchers")
+  }
+
+  override def reconfigure(configs: util.Map[String, _]): Unit = {
+    reconfigureCount += 1
+    numFetchers = 
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
+  }
+
+  override def close(): Unit = {
+  }
+
+  def verifyState(reconfigureCount: Int, numFetcher: Int = 1): Unit = {
+    assertEquals(reconfigureCount, this.reconfigureCount)
+    assertEquals(numFetcher, this.numFetchers)
+  }
+}
+
 
 class MockFileConfigProvider extends FileConfigProvider {
   @throws(classOf[IOException])

Reply via email to