This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 3.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.7 by this push:
new 1c4b02a6fbf KAFKA-17584: Fix incorrect synonym handling for dynamic
log configurations
1c4b02a6fbf is described below
commit 1c4b02a6fbf0fecdcd0f69656ad8bff1454ad57b
Author: Christo Lolov <[email protected]>
AuthorDate: Mon Nov 25 18:07:29 2024 +0000
KAFKA-17584: Fix incorrect synonym handling for dynamic log configurations
This is a cherry-pick of #17258 to 3.7.2
This commit differs from the original by using the old (read 3.7)
references to the configurations and not changing as many unit tests
Reviewers: Divij Vaidya <[email protected]>, Colin Patrick McCabe
<[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 41 ++++++++++++---------
.../server/DynamicBrokerReconfigurationTest.scala | 5 ++-
.../kafka/server/DynamicBrokerConfigTest.scala | 42 +++++++++++++++++++++-
3 files changed, 70 insertions(+), 18 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 142349fcfd6..95a531ed89d 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -30,7 +30,7 @@ import kafka.utils.{CoreUtils, Logging, PasswordEncoder}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigException, SslConfigs}
+import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigException, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
@@ -667,13 +667,25 @@ trait BrokerReconfigurable {
}
object DynamicLogConfig {
- // Exclude message.format.version for now since we need to check that the
version
- // is supported on all brokers in the cluster.
+ /**
+ * The log configurations that are non-reconfigurable. This set contains the
names you
+ * would use when setting a dynamic configuration on a topic, which are
different than the
+ * corresponding broker configuration names.
+ *
+ * For now, message.format.version is not reconfigurable, since we need to
check that
+ * the version is supported on all brokers in the cluster.
+ */
@nowarn("cat=deprecation")
- val ExcludedConfigs = Set(KafkaConfig.LogMessageFormatVersionProp)
+ val NonReconfigrableLogConfigs: Set[String] =
Set(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)
- val ReconfigurableConfigs =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet --
ExcludedConfigs
- val KafkaConfigToLogConfigName =
ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) =>
(v, k) }
+ /**
+ * The broker configurations pertaining to logs that are reconfigurable.
This set contains
+ * the names you would use when setting a static or dynamic broker
configuration (not topic
+ * configuration).
+ */
+ val ReconfigurableConfigs: Set[String] =
+ ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.
+ filterNot(s => NonReconfigrableLogConfigs.contains(s._1)).values.toSet
}
class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends
BrokerReconfigurable with Logging {
@@ -738,17 +750,14 @@ class DynamicLogConfig(logManager: LogManager, server:
KafkaBroker) extends Brok
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig):
Unit = {
val originalLogConfig = logManager.currentDefaultConfig
val originalUncleanLeaderElectionEnable =
originalLogConfig.uncleanLeaderElectionEnable
- val newBrokerDefaults = new util.HashMap[String,
Object](originalLogConfig.originals)
- newConfig.valuesFromThisConfig.forEach { (k, v) =>
- if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
- DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach {
configName =>
- if (v == null)
- newBrokerDefaults.remove(configName)
- else
- newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
- }
+ val newBrokerDefaults = new util.HashMap[String,
Object](newConfig.extractLogConfigMap)
+ val originalLogConfigMap = originalLogConfig.originals()
+ DynamicLogConfig.NonReconfigrableLogConfigs.foreach(k => {
+ Option(originalLogConfigMap.get(k)) match {
+ case None => newBrokerDefaults.remove(k)
+ case Some(v) => newBrokerDefaults.put(k, v)
}
- }
+ })
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 55714993631..96e1d223a07 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -59,6 +59,7 @@ import org.apache.kafka.common.requests.MetadataRequest
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.security.scram.ScramCredential
import org.apache.kafka.common.serialization.{StringDeserializer,
StringSerializer}
+import org.apache.kafka.server.config.ServerTopicConfigSynonyms
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.apache.kafka.server.util.ShutdownableThread
@@ -663,8 +664,10 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
val log = servers.head.logManager.getLog(new TopicPartition(topic,
0)).getOrElse(throw new IllegalStateException("Log not found"))
TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing
topic config using defaults not updated")
+ val KafkaConfigToLogConfigName: Map[String, String] =
+ ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k,
v) => (v, k) }
props.asScala.foreach { case (k, v) =>
- val logConfigName = DynamicLogConfig.KafkaConfigToLogConfigName(k)
+ val logConfigName = KafkaConfigToLogConfigName(k)
val expectedValue = if (k == KafkaConfig.LogCleanupPolicyProp) s"[$v]"
else v
assertEquals(expectedValue,
log.config.originals.get(logConfigName).toString,
s"Not reconfigured $logConfigName for existing log")
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index e6537edb025..3ef3c7eb9b2 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -19,7 +19,7 @@ package kafka.server
import java.{lang, util}
import java.util.{Properties, Map => JMap}
-import java.util.concurrent.CompletionStage
+import java.util.concurrent.{CompletionStage, TimeUnit}
import java.util.concurrent.atomic.AtomicReference
import kafka.controller.KafkaController
import kafka.log.LogManager
@@ -833,6 +833,46 @@ class DynamicBrokerConfigTest {
// validate per broker config
assertThrows(classOf[ConfigException], () =>
config.dynamicConfig.validate(newProps, perBrokerConfig = true))
}
+
+ class DynamicLogConfigContext(origProps: Properties) {
+ val config = KafkaConfig(origProps)
+ val serverMock = Mockito.mock(classOf[BrokerServer])
+ val logManagerMock = Mockito.mock(classOf[LogManager])
+ Mockito.when(serverMock.config).thenReturn(config)
+ Mockito.when(serverMock.logManager).thenReturn(logManagerMock)
+ Mockito.when(logManagerMock.allLogs).thenReturn(Iterable.empty)
+ val currentDefaultLogConfig = new AtomicReference(new LogConfig(new
Properties))
+ Mockito.when(logManagerMock.currentDefaultConfig).thenAnswer(_ =>
currentDefaultLogConfig.get())
+
Mockito.when(logManagerMock.reconfigureDefaultLogConfig(ArgumentMatchers.any(classOf[LogConfig])))
+ .thenAnswer(invocation =>
currentDefaultLogConfig.set(invocation.getArgument(0)))
+ config.dynamicConfig.initialize(None, None)
+ config.dynamicConfig.addBrokerReconfigurable(new
DynamicLogConfig(logManagerMock, serverMock))
+ }
+
+ @Test
+ def testDynamicLogConfigHandlesSynonymsCorrectly(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
+ origProps.put(KafkaConfig.LogRetentionTimeMinutesProp, "1")
+ val ctx = new DynamicLogConfigContext(origProps)
+ assertEquals(TimeUnit.MINUTES.toMillis(1),
ctx.config.logRetentionTimeMillis)
+ val props = new Properties()
+ props.put(KafkaConfig.MessageMaxBytesProp, "12345678")
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(TimeUnit.MINUTES.toMillis(1),
ctx.currentDefaultLogConfig.get().retentionMs)
+ }
+
+ @Test
+ def testLogRetentionTimeMinutesIsNotDynamicallyReconfigurable(): Unit = {
+ val origProps = TestUtils.createBrokerConfig(0, null, port = 8181)
+ origProps.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
+ val ctx = new DynamicLogConfigContext(origProps)
+ assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
+ val props = new Properties()
+ props.put(KafkaConfig.LogRetentionTimeMinutesProp, "3")
+ ctx.config.dynamicConfig.updateDefaultConfig(props)
+ assertEquals(TimeUnit.HOURS.toMillis(1), ctx.config.logRetentionTimeMillis)
+
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(KafkaConfig.LogRetentionTimeHoursProp))
+ }
}
class TestDynamicThreadPool() extends BrokerReconfigurable {