This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 71b1e19 KAFKA-9254; Overridden topic configs are reset after dynamic
default change (#7870) (#8067)
71b1e19 is described below
commit 71b1e19fc60b5e1f9bba33025737ec2b7fb1c2aa
Author: Vikas Singh <[email protected]>
AuthorDate: Tue Mar 3 12:00:10 2020 -0800
KAFKA-9254; Overridden topic configs are reset after dynamic default change
(#7870) (#8067)
Currently, when a dynamic change is made to the broker-level default log
configuration, existing log configs will be recreated with an empty overridden
configs. In such case, when updating dynamic broker configs a second round, the
topic-level configs are lost. This can cause unexpected data loss, for example,
if the cleanup policy changes from "compact" to "delete."
Reviewers: Rajini Sivaram <[email protected]>, Jason Gustafson
<[email protected]>
(cherry picked from commit 0e7f867041959c5d77727c7f5ce32d363fa09fc2)
Co-authored-by: huxi <[email protected]>
---
.../scala/kafka/server/DynamicBrokerConfig.scala | 2 +-
.../server/DynamicBrokerReconfigurationTest.scala | 36 +++++++++++++++++++++-
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 5aaec89..e42c894 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -540,7 +540,7 @@ class DynamicLogConfig(logManager: LogManager) extends
Reconfigurable with Loggi
props ++= newBrokerDefaults.asScala
props ++=
log.config.originals.asScala.filterKeys(log.config.overriddenConfigs.contains)
- val logConfig = LogConfig(props.asJava)
+ val logConfig = LogConfig(props.asJava, log.config.overriddenConfigs)
log.updateConfig(newBrokerDefaults.asScala.keySet, logConfig)
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 5ef7cd2..8c3a657 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -293,6 +293,40 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
}
@Test
+ def testConsecutiveConfigChange(): Unit = {
+ val topic2 = "testtopic2"
+ val topicProps = new Properties
+ topicProps.put(KafkaConfig.MinInSyncReplicasProp, "2")
+ TestUtils.createTopic(zkClient, topic2, 1, replicationFactor = numServers,
servers, topicProps)
+ var log = servers.head.logManager.getLog(new TopicPartition(topic2,
0)).getOrElse(throw new IllegalStateException("Log not found"))
+
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
+ assertEquals("2",
log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString)
+
+ val props = new Properties
+ props.put(KafkaConfig.MinInSyncReplicasProp, "3")
+ // Make a broker-default config
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.MinInSyncReplicasProp, "3"))
+ // Verify that all broker defaults have been updated again
+ servers.foreach { server =>
+ props.asScala.foreach { case (k, v) =>
+ assertEquals(s"Not reconfigured $k", v,
server.config.originals.get(k).toString)
+ }
+ }
+
+ log = servers.head.logManager.getLog(new TopicPartition(topic2,
0)).getOrElse(throw new IllegalStateException("Log not found"))
+
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
+ assertEquals("2",
log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) //
Verify topic-level config survives
+
+ // Make a second broker-default change
+ props.clear()
+ props.put(KafkaConfig.LogRetentionTimeMillisProp, "604800000")
+ reconfigureServers(props, perBrokerConfig = false,
(KafkaConfig.LogRetentionTimeMillisProp, "604800000"))
+ log = servers.head.logManager.getLog(new TopicPartition(topic2,
0)).getOrElse(throw new IllegalStateException("Log not found"))
+
assertTrue(log.config.overriddenConfigs.contains(KafkaConfig.MinInSyncReplicasProp))
+ assertEquals("2",
log.config.originals().get(KafkaConfig.MinInSyncReplicasProp).toString) //
Verify topic-level config still survives
+ }
+
+ @Test
def testDefaultTopicConfig(): Unit = {
val (producerThread, consumerThread) = startProduceConsume(retries = 0)
@@ -498,7 +532,7 @@ class DynamicBrokerReconfigurationTest extends
ZooKeeperTestHarness with SaslSet
val partitions = (0 until numPartitions).map(i => new
TopicPartition(topic, i)).filter { tp =>
zkClient.getLeaderForPartition(tp) == Some(leaderId)
}
- assertTrue(s"Partitons not found with leader $leaderId",
partitions.nonEmpty)
+ assertTrue(s"Partitions not found with leader $leaderId",
partitions.nonEmpty)
partitions.foreach { tp =>
(1 to 2).foreach { i =>
val replicaFetcherManager =
servers(i).replicaManager.replicaFetcherManager