[ https://issues.apache.org/jira/browse/KAFKA-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341720#comment-16341720 ]
ASF GitHub Bot commented on KAFKA-6244: --------------------------------------- hachikuji closed pull request #4465: KAFKA-6244: Dynamic update of log cleaner configuration URL: https://github.com/apache/kafka/pull/4465 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 637e24cb01d..e013cfbdc92 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -20,13 +20,14 @@ package kafka.log import java.io.{File, IOException} import java.nio._ import java.nio.file.Files +import java.util import java.util.Date import java.util.concurrent.TimeUnit import com.yammer.metrics.core.Gauge import kafka.common._ import kafka.metrics.KafkaMetricsGroup -import kafka.server.LogDirFailureChannel +import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel} import kafka.utils._ import org.apache.kafka.common.record._ import org.apache.kafka.common.utils.Time @@ -35,7 +36,7 @@ import org.apache.kafka.common.errors.KafkaStorageException import org.apache.kafka.common.record.MemoryRecords.RecordFilter import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention -import scala.collection.mutable +import scala.collection.{Set, mutable} import scala.collection.JavaConverters._ /** @@ -83,16 +84,20 @@ import scala.collection.JavaConverters._ * data from the transaction prior to reaching the offset of the marker. This follows the same logic used for * tombstone deletion. * - * @param config Configuration parameters for the cleaner + * @param initialConfig Initial configuration parameters for the cleaner. Actual config may be dynamically updated. * @param logDirs The directories where offset checkpoints reside * @param logs The pool of logs * @param time A way to control the passage of time */ -class LogCleaner(val config: CleanerConfig, +class LogCleaner(initialConfig: CleanerConfig, val logDirs: Seq[File], val logs: Pool[TopicPartition, Log], val logDirFailureChannel: LogDirFailureChannel, - time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup { + time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable +{ + + /* Log cleaner configuration which may be dynamically updated */ + @volatile private var config = initialConfig /* for managing the state of partitions being cleaned. package-private to allow access in tests */ private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel) @@ -106,7 +111,7 @@ class LogCleaner(val config: CleanerConfig, time = time) /* the threads */ - private val cleaners = (0 until config.numThreads).map(new CleanerThread(_)) + private val cleaners = mutable.ArrayBuffer[CleanerThread]() /* a metric to track the maximum utilization of any thread's buffer in the last cleaning */ newGauge("max-buffer-utilization-percent", @@ -133,7 +138,11 @@ class LogCleaner(val config: CleanerConfig, */ def startup() { info("Starting the log cleaner") - cleaners.foreach(_.start()) + (0 until config.numThreads).foreach { i => + val cleaner = new CleanerThread(i) + cleaners += cleaner + cleaner.start() + } } /** @@ -142,6 +151,27 @@ class LogCleaner(val config: CleanerConfig, def shutdown() { info("Shutting down the log cleaner.") cleaners.foreach(_.shutdown()) + cleaners.clear() + } + + override def reconfigurableConfigs(): Set[String] = { + LogCleaner.ReconfigurableConfigs + } + + override def validateReconfiguration(newConfig: KafkaConfig): Boolean = { + val newCleanerConfig = LogCleaner.cleanerConfig(newConfig) + val numThreads = newCleanerConfig.numThreads + numThreads >= 1 && numThreads >= config.numThreads / 2 && numThreads <= config.numThreads * 2 + } + + /** + * Reconfigure log clean config. This simply stops current log cleaners and creates new ones. + * That ensures that if any of the cleaners had failed, new cleaners are created to match the new config. + */ + override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { + config = LogCleaner.cleanerConfig(newConfig) + shutdown() + startup() } /** @@ -210,6 +240,12 @@ class LogCleaner(val config: CleanerConfig, isCleaned } + // Only for testing + private[kafka] def currentConfig: CleanerConfig = config + + // Only for testing + private[log] def cleanerCount: Int = cleaners.size + /** * The cleaner threads do the actual log cleaning. Each thread processes does its cleaning repeatedly by * choosing the dirtiest log, cleaning it, and then swapping in the cleaned segments. @@ -317,6 +353,30 @@ class LogCleaner(val config: CleanerConfig, } } +object LogCleaner { + val ReconfigurableConfigs = Set( + KafkaConfig.LogCleanerThreadsProp, + KafkaConfig.LogCleanerDedupeBufferSizeProp, + KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, + KafkaConfig.LogCleanerIoBufferSizeProp, + KafkaConfig.MessageMaxBytesProp, + KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, + KafkaConfig.LogCleanerBackoffMsProp + ) + + def cleanerConfig(config: KafkaConfig): CleanerConfig = { + CleanerConfig(numThreads = config.logCleanerThreads, + dedupeBufferSize = config.logCleanerDedupeBufferSize, + dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor, + ioBufferSize = config.logCleanerIoBufferSize, + maxMessageSize = config.messageMaxBytes, + maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond, + backOffMs = config.logCleanerBackoffMs, + enableCleaner = config.logCleanerEnable) + + } +} + /** * This class holds the actual logic for cleaning a log * @param id An identifier used for logging diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index a7d106fb3ea..37a0be84fc1 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -893,18 +893,11 @@ object LogManager { val defaultProps = KafkaServer.copyKafkaConfigToLog(config) val defaultLogConfig = LogConfig(defaultProps) + // read the log configurations from zookeeper val (topicConfigs, failed) = zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps) if (!failed.isEmpty) throw failed.head._2 - // read the log configurations from zookeeper - val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads, - dedupeBufferSize = config.logCleanerDedupeBufferSize, - dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor, - ioBufferSize = config.logCleanerIoBufferSize, - maxMessageSize = config.messageMaxBytes, - maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond, - backOffMs = config.logCleanerBackoffMs, - enableCleaner = config.logCleanerEnable) + val cleanerConfig = LogCleaner.cleanerConfig(config) new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile), initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile), diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala index f307b8dddca..2c186d38fa0 100755 --- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala +++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala @@ -22,6 +22,7 @@ import java.util import java.util.Properties import java.util.concurrent.locks.ReentrantReadWriteLock +import kafka.log.LogCleaner import kafka.server.DynamicBrokerConfig._ import kafka.utils.{CoreUtils, Logging} import kafka.zk.{AdminZkClient, KafkaZkClient} @@ -77,6 +78,7 @@ object DynamicBrokerConfig { val AllDynamicConfigs = mutable.Set[String]() AllDynamicConfigs ++= DynamicSecurityConfigs + AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs private val PerBrokerConfigs = DynamicSecurityConfigs @@ -115,6 +117,7 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging private val dynamicDefaultConfigs = mutable.Map[String, String]() private val brokerId = kafkaConfig.brokerId private val reconfigurables = mutable.Buffer[Reconfigurable]() + private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]() private val lock = new ReentrantReadWriteLock private var currentConfig = kafkaConfig @@ -124,11 +127,21 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging updateBrokerConfig(brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString)) } + def addReconfigurables(kafkaServer: KafkaServer): Unit = { + if (kafkaServer.logManager.cleaner != null) + addBrokerReconfigurable(kafkaServer.logManager.cleaner) + } + def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains)) reconfigurables += reconfigurable } + def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) { + require(reconfigurable.reconfigurableConfigs.forall(AllDynamicConfigs.contains)) + brokerReconfigurables += reconfigurable + } + def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) { reconfigurables -= reconfigurable } @@ -327,9 +340,15 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix) val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix) val updatedKeys = updatedConfigs(newValues, oldValues).keySet - processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly) + if (needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys)) + processReconfigurable(listenerReconfigurable, newValues, customConfigs, validateOnly) case reconfigurable => - processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly) + if (needsReconfiguration(reconfigurable.reconfigurableConfigs, updatedMap.keySet)) + processReconfigurable(reconfigurable, newConfig.valuesFromThisConfig, customConfigs, validateOnly) + } + brokerReconfigurables.foreach { reconfigurable => + if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, updatedMap.keySet)) + processBrokerReconfigurable(reconfigurable, currentConfig, newConfig, validateOnly) } newConfig } catch { @@ -343,18 +362,41 @@ class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging currentConfig } - private def processReconfigurable(reconfigurable: Reconfigurable, updatedKeys: Set[String], - allNewConfigs: util.Map[String, _], newCustomConfigs: util.Map[String, Object], + private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String]): Boolean = { + reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty + } + + private def processReconfigurable(reconfigurable: Reconfigurable, + allNewConfigs: util.Map[String, _], + newCustomConfigs: util.Map[String, Object], validateOnly: Boolean): Unit = { - if (reconfigurable.reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty) { - val newConfigs = new util.HashMap[String, Object] - allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } - newConfigs.putAll(newCustomConfigs) - if (validateOnly) { - if (!reconfigurable.validateReconfiguration(newConfigs)) - throw new ConfigException("Validation of dynamic config update failed") - } else - reconfigurable.reconfigure(newConfigs) - } + val newConfigs = new util.HashMap[String, Object] + allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) } + newConfigs.putAll(newCustomConfigs) + if (validateOnly) { + if (!reconfigurable.validateReconfiguration(newConfigs)) + throw new ConfigException("Validation of dynamic config update failed") + } else + reconfigurable.reconfigure(newConfigs) } + + private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable, + oldConfig: KafkaConfig, + newConfig: KafkaConfig, + validateOnly: Boolean): Unit = { + if (validateOnly) { + if (!reconfigurable.validateReconfiguration(newConfig)) + throw new ConfigException("Validation of dynamic config update failed") + } else + reconfigurable.reconfigure(oldConfig, newConfig) + } +} + +trait BrokerReconfigurable { + + def reconfigurableConfigs: Set[String] + + def validateReconfiguration(newConfig: KafkaConfig): Boolean + + def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit } diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 80b0eb73d0a..c4123f19749 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -290,6 +290,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP Mx4jLoader.maybeLoad() + /* Add all reconfigurables for config change notification before starting config handlers */ + config.dynamicConfig.addReconfigurables(this) + /* start dynamic config manager */ dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> new TopicConfigHandler(logManager, config, quotaManagers), ConfigType.Client -> new ClientIdConfigHandler(quotaManagers), diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala index a760d7d1d06..c6f023ff8d0 100644 --- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala +++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala @@ -242,6 +242,46 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false) } + @Test + def testLogCleanerConfig(): Unit = { + val (producerThread, consumerThread) = startProduceConsume(0) + + verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1) + + val props = new Properties + props.put(KafkaConfig.LogCleanerThreadsProp, "2") + props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "20000000") + props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, "0.8") + props.put(KafkaConfig.LogCleanerIoBufferSizeProp, "300000") + props.put(KafkaConfig.MessageMaxBytesProp, "40000") + props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000") + props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000") + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerThreadsProp, "2")) + + // Verify cleaner config was updated + val newCleanerConfig = servers.head.logManager.cleaner.currentConfig + assertEquals(2, newCleanerConfig.numThreads) + assertEquals(20000000, newCleanerConfig.dedupeBufferSize) + assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001) + assertEquals(300000, newCleanerConfig.ioBufferSize) + assertEquals(40000, newCleanerConfig.maxMessageSize) + assertEquals(50000000, newCleanerConfig.maxIoBytesPerSecond, 50000000) + assertEquals(6000, newCleanerConfig.backOffMs) + + // Verify thread count + verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2) + + // Stop a couple of threads and verify they are recreated if any config is updated + def cleanerThreads = Thread.getAllStackTraces.keySet.asScala.filter(_.getName.startsWith("kafka-log-cleaner-thread-")) + cleanerThreads.take(2).foreach(_.interrupt()) + TestUtils.waitUntilTrue(() => cleanerThreads.size == (2 * numServers) - 2, "Threads did not exit") + props.put(KafkaConfig.LogCleanerBackoffMsProp, "8000") + reconfigureServers(props, perBrokerConfig = false, (KafkaConfig.LogCleanerBackoffMsProp, "8000")) + verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2) + + stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false) + } + private def createProducer(trustStore: File, retries: Int, clientId: String = "test-producer"): KafkaProducer[String, String] = { val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal)) @@ -411,6 +451,19 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet props } + private def currentThreads: List[String] = { + Thread.getAllStackTraces.keySet.asScala.toList.map(_.getName) + } + + private def verifyThreads(threadPrefix: String, countPerBroker: Int): Unit = { + val expectedCount = countPerBroker * servers.size + val (threads, resized) = TestUtils.computeUntilTrue(currentThreads.filter(_.startsWith(threadPrefix))) { + _.size == expectedCount + } + assertTrue(s"Invalid threads: expected $expectedCount, got ${threads.size}: $threads", resized) + } + + private def startProduceConsume(retries: Int): (ProducerThread, ConsumerThread) = { val producerThread = new ProducerThread(retries) clientThreads += producerThread diff --git a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala index bff27006e28..0ad5b46de2c 100644 --- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala @@ -79,6 +79,7 @@ abstract class AbstractLogCleanerIntegrationTest { compactionLag: Long = defaultCompactionLag, deleteDelay: Int = defaultDeleteDelay, segmentSize: Int = defaultSegmentSize, + cleanerIoBufferSize: Option[Int] = None, propertyOverrides: Properties = new Properties()): LogCleaner = { val logMap = new Pool[TopicPartition, Log]() @@ -108,7 +109,7 @@ abstract class AbstractLogCleanerIntegrationTest { val cleanerConfig = CleanerConfig( numThreads = numThreads, - ioBufferSize = maxMessageSize / 2, + ioBufferSize = cleanerIoBufferSize.getOrElse(maxMessageSize / 2), maxMessageSize = maxMessageSize, backOffMs = backOffMs) new LogCleaner(cleanerConfig, diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala index b20622f99ab..22d7e77fa8d 100755 --- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala @@ -18,10 +18,12 @@ package kafka.log import java.io.File +import java.util import java.util.Properties import kafka.api.KAFKA_0_11_0_IV0 import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0} +import kafka.server.KafkaConfig import kafka.server.checkpoints.OffsetCheckpointFile import kafka.utils._ import org.apache.kafka.common.TopicPartition @@ -227,6 +229,56 @@ class LogCleanerIntegrationTest(compressionCodec: String) extends AbstractLogCle checkLogAfterAppendingDups(log, startSize, appends) } + @Test + def cleanerConfigUpdateTest() { + val largeMessageKey = 20 + val (largeMessageValue, largeMessageSet) = createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE) + val maxMessageSize = largeMessageSet.sizeInBytes + + cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1, maxMessageSize = maxMessageSize, + cleanerIoBufferSize = Some(1)) + val log = cleaner.logs.get(topicPartitions(0)) + + val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = codec) + val startSize = log.size + cleaner.startup() + assertEquals(1, cleaner.cleanerCount) + + // Verify no cleaning with LogCleanerIoBufferSizeProp=1 + val firstDirty = log.activeSegment.baseOffset + val topicPartition = new TopicPartition("log", 0) + cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10) + assertTrue("Should not have cleaned", cleaner.cleanerManager.allCleanerCheckpoints.isEmpty) + + def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): KafkaConfig = { + val props = TestUtils.createBrokerConfig(0, "localhost:2181") + props.put(KafkaConfig.LogCleanerThreadsProp, cleanerConfig.numThreads.toString) + props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, cleanerConfig.dedupeBufferSize.toString) + props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, cleanerConfig.dedupeBufferLoadFactor.toString) + props.put(KafkaConfig.LogCleanerIoBufferSizeProp, cleanerConfig.ioBufferSize.toString) + props.put(KafkaConfig.MessageMaxBytesProp, cleanerConfig.maxMessageSize.toString) + props.put(KafkaConfig.LogCleanerBackoffMsProp, cleanerConfig.backOffMs.toString) + props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, cleanerConfig.maxIoBytesPerSecond.toString) + KafkaConfig.fromProps(props) + } + + // Verify cleaning done with larger LogCleanerIoBufferSizeProp + val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig) + val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2, + dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize, + dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor, + ioBufferSize = 100000, + maxMessageSize = cleaner.currentConfig.maxMessageSize, + maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond, + backOffMs = cleaner.currentConfig.backOffMs)) + cleaner.reconfigure(oldConfig, newConfig) + + assertEquals(2, cleaner.cleanerCount) + checkLastCleaned("log", 0, firstDirty) + val compactedSize = log.logSegments.map(_.size).sum + assertTrue(s"log should have been compacted: startSize=$startSize compactedSize=$compactedSize", startSize > compactedSize) + } + private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: Long) { // wait until cleaning up to base_offset, note that cleaning happens only when "log dirty ratio" is higher than // LogConfig.MinCleanableDirtyRatioProp diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala index 20320116b23..6dedbe00d7a 100755 --- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala +++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala @@ -104,23 +104,24 @@ class DynamicBrokerConfigTest { verifyConfigUpdateWithInvalidConfig(validProps, securityPropsWithoutListenerPrefix) val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181") verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps) + + val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid") + verifyConfigUpdateWithInvalidConfig(validProps, invalidProps) } @Test def testSecurityConfigs(): Unit = { - def verifyUpdate(name: String, value: Object, invalidValue: Boolean): Unit = { + def verifyUpdate(name: String, value: Object): Unit = { verifyConfigUpdate(name, value, perBrokerConfig = true, expectFailure = true) - verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = true, expectFailure = invalidValue) + verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = true, expectFailure = false) verifyConfigUpdate(name, value, perBrokerConfig = false, expectFailure = true) verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = false, expectFailure = true) } - verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks", invalidValue = false) - verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS", invalidValue = false) - verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password", invalidValue = false) - verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password", invalidValue = false) - verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 1.asInstanceOf[Integer], invalidValue = true) - verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 1.asInstanceOf[Integer], invalidValue = true) + verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks") + verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS") + verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password") + verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password") } private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Enable dynamic reconfiguration of log cleaners > ---------------------------------------------- > > Key: KAFKA-6244 > URL: https://issues.apache.org/jira/browse/KAFKA-6244 > Project: Kafka > Issue Type: Sub-task > Components: core > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 1.1.0 > > > See > [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration] > for details. -- This message was sent by Atlassian JIRA (v7.6.3#76005)