This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c797f85de48 KAFKA-19642 Replace dynamicPerBrokerConfigs with
dynamicDefaultConfigs (#20405)
c797f85de48 is described below
commit c797f85de481ad3e6840518665d00e1f9c72111f
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Wed Aug 27 08:34:58 2025 +0200
KAFKA-19642 Replace dynamicPerBrokerConfigs with dynamicDefaultConfigs
(#20405)
- **Changes**: Replace misused dynamicPerBrokerConfigs with
dynamicDefaultConfigs
- **Reasons**: KRaft servers don't handle the cluser-level configs in
starting
from: https://github.com/apache/kafka/pull/18949/files#r2296809389
Reviewers: Jun Rao <[email protected]>, Jhen-Yung Hsu
<[email protected]>, PoAn Yang <[email protected]>, Chia-Ping Tsai
<[email protected]>
---------
Co-authored-by: PoAn Yang <[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 2 +-
.../server/DynamicBrokerReconfigurationTest.scala | 79 ++++++++++++++++++++++
2 files changed, 80 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index c64218246b3..124a4c7b78f 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -239,7 +239,7 @@ object DynamicBrokerConfig {
}
}
val configHandler = new BrokerConfigHandler(config, quotaManagers)
- configHandler.processConfigChanges("", dynamicPerBrokerConfigs)
+ configHandler.processConfigChanges("", dynamicDefaultConfigs)
configHandler.processConfigChanges(config.brokerId.toString,
dynamicPerBrokerConfigs)
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 75447e04df3..170ee3679f4 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -1100,9 +1100,13 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def
testServersCanStartWithInvalidStaticConfigsAndValidDynamicConfigs(groupProtocol:
String): Unit = {
+ TestNumReplicaFetcherMetricsReporter.testReporters.clear()
+
// modify snapshot interval config to explicitly take snapshot on a broker
with valid dynamic configs
val props = defaultStaticConfig(numServers)
props.put(MetadataLogConfig.METADATA_SNAPSHOT_MAX_INTERVAL_MS_CONFIG,
"10000")
+ props.put(MetricConfigs.METRIC_REPORTER_CLASSES_CONFIG,
classOf[TestNumReplicaFetcherMetricsReporter].getName)
+ props.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "1")
val kafkaConfig = KafkaConfig.fromProps(props)
val newBroker = createBroker(kafkaConfig).asInstanceOf[BrokerServer]
@@ -1110,6 +1114,15 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
alterSslKeystoreUsingConfigCommand(sslProperties1,
listenerPrefix(SecureExternal))
+ // Add num.replica.fetchers to the cluster-level config.
+ val clusterLevelProps = new Properties
+ clusterLevelProps.put(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2")
+ reconfigureServers(clusterLevelProps, perBrokerConfig = false,
(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG, "2"))
+
+ // Wait for the metrics reporter to be configured
+ val initialReporter =
TestNumReplicaFetcherMetricsReporter.waitForReporters(1).head
+ initialReporter.verifyState(reconfigureCount = 1, numFetcher = 2)
+
TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)
TestUtils.waitUntilTrue(
@@ -1122,11 +1135,19 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
newBroker.shutdown()
newBroker.awaitShutdown()
+ // Clean up the test reporter
+ TestNumReplicaFetcherMetricsReporter.testReporters.clear()
+
val invalidStaticConfigs = defaultStaticConfig(newBroker.config.brokerId)
invalidStaticConfigs.putAll(securityProps(invalidSslConfigs,
KEYSTORE_PROPS, listenerPrefix(SecureExternal)))
newBroker.config.updateCurrentConfig(KafkaConfig.fromProps(invalidStaticConfigs))
newBroker.startup()
+
+ // Verify that the custom MetricsReporter is not reconfigured after
restart.
+ // If readDynamicBrokerConfigsFromSnapshot works correctly, the reporter
should maintain its state.
+ val reporterAfterRestart =
TestNumReplicaFetcherMetricsReporter.waitForReporters(1).head
+ reporterAfterRestart.verifyState(reconfigureCount = 0, numFetcher = 2)
}
private def awaitInitialPositions(consumer: Consumer[_, _]): Unit = {
@@ -1635,6 +1656,64 @@ class TestMetricsReporter extends MetricsReporter with
Reconfigurable with Close
}
}
+object TestNumReplicaFetcherMetricsReporter {
+ val testReporters = new
ConcurrentLinkedQueue[TestNumReplicaFetcherMetricsReporter]()
+
+ def waitForReporters(count: Int): List[TestNumReplicaFetcherMetricsReporter]
= {
+ TestUtils.waitUntilTrue(() => testReporters.size == count, msg = "Metrics
reporters size not matched. Expected: " + count + ", actual: " +
testReporters.size())
+
+ val reporters = testReporters.asScala.toList
+ TestUtils.waitUntilTrue(() => reporters.forall(_.configureCount == 1), msg
= "Metrics reporters not configured")
+ reporters
+ }
+}
+
+
+class TestNumReplicaFetcherMetricsReporter extends MetricsReporter {
+ import TestNumReplicaFetcherMetricsReporter._
+ @volatile var configureCount = 0
+ @volatile var reconfigureCount = 0
+ @volatile var numFetchers: Int = 1
+ testReporters.add(this)
+
+ override def init(metrics: util.List[KafkaMetric]): Unit = {
+ }
+
+ override def configure(configs: util.Map[String, _]): Unit = {
+ configureCount += 1
+ numFetchers =
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
+ }
+
+ override def metricChange(metric: KafkaMetric): Unit = {
+ }
+
+ override def metricRemoval(metric: KafkaMetric): Unit = {
+ }
+
+ override def reconfigurableConfigs(): util.Set[String] = {
+ util.Set.of(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG)
+ }
+
+ override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
+ val numFetchers =
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
+ if (numFetchers <= 0)
+ throw new ConfigException(s"Invalid num.replica.fetchers $numFetchers")
+ }
+
+ override def reconfigure(configs: util.Map[String, _]): Unit = {
+ reconfigureCount += 1
+ numFetchers =
configs.get(ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG).toString.toInt
+ }
+
+ override def close(): Unit = {
+ }
+
+ def verifyState(reconfigureCount: Int, numFetcher: Int = 1): Unit = {
+ assertEquals(reconfigureCount, this.reconfigureCount)
+ assertEquals(numFetcher, this.numFetchers)
+ }
+}
+
class MockFileConfigProvider extends FileConfigProvider {
@throws(classOf[IOException])