[ 
https://issues.apache.org/jira/browse/KAFKA-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16341720#comment-16341720
 ] 

ASF GitHub Bot commented on KAFKA-6244:
---------------------------------------

hachikuji closed pull request #4465: KAFKA-6244: Dynamic update of log cleaner 
configuration
URL: https://github.com/apache/kafka/pull/4465
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 637e24cb01d..e013cfbdc92 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -20,13 +20,14 @@ package kafka.log
 import java.io.{File, IOException}
 import java.nio._
 import java.nio.file.Files
+import java.util
 import java.util.Date
 import java.util.concurrent.TimeUnit
 
 import com.yammer.metrics.core.Gauge
 import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
-import kafka.server.LogDirFailureChannel
+import kafka.server.{BrokerReconfigurable, KafkaConfig, LogDirFailureChannel}
 import kafka.utils._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
@@ -35,7 +36,7 @@ import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 
-import scala.collection.mutable
+import scala.collection.{Set, mutable}
 import scala.collection.JavaConverters._
 
 /**
@@ -83,16 +84,20 @@ import scala.collection.JavaConverters._
  *    data from the transaction prior to reaching the offset of the marker. 
This follows the same logic used for
  *    tombstone deletion.
  *
- * @param config Configuration parameters for the cleaner
+ * @param initialConfig Initial configuration parameters for the cleaner. 
Actual config may be dynamically updated.
  * @param logDirs The directories where offset checkpoints reside
  * @param logs The pool of logs
  * @param time A way to control the passage of time
  */
-class LogCleaner(val config: CleanerConfig,
+class LogCleaner(initialConfig: CleanerConfig,
                  val logDirs: Seq[File],
                  val logs: Pool[TopicPartition, Log],
                  val logDirFailureChannel: LogDirFailureChannel,
-                 time: Time = Time.SYSTEM) extends Logging with 
KafkaMetricsGroup {
+                 time: Time = Time.SYSTEM) extends Logging with 
KafkaMetricsGroup with BrokerReconfigurable
+{
+
+  /* Log cleaner configuration which may be dynamically updated */
+  @volatile private var config = initialConfig
 
   /* for managing the state of partitions being cleaned. package-private to 
allow access in tests */
   private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, 
logDirFailureChannel)
@@ -106,7 +111,7 @@ class LogCleaner(val config: CleanerConfig,
                                         time = time)
 
   /* the threads */
-  private val cleaners = (0 until config.numThreads).map(new CleanerThread(_))
+  private val cleaners = mutable.ArrayBuffer[CleanerThread]()
 
   /* a metric to track the maximum utilization of any thread's buffer in the 
last cleaning */
   newGauge("max-buffer-utilization-percent",
@@ -133,7 +138,11 @@ class LogCleaner(val config: CleanerConfig,
    */
   def startup() {
     info("Starting the log cleaner")
-    cleaners.foreach(_.start())
+    (0 until config.numThreads).foreach { i =>
+      val cleaner = new CleanerThread(i)
+      cleaners += cleaner
+      cleaner.start()
+    }
   }
 
   /**
@@ -142,6 +151,27 @@ class LogCleaner(val config: CleanerConfig,
   def shutdown() {
     info("Shutting down the log cleaner.")
     cleaners.foreach(_.shutdown())
+    cleaners.clear()
+  }
+
+  override def reconfigurableConfigs(): Set[String] = {
+    LogCleaner.ReconfigurableConfigs
+  }
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Boolean = {
+    val newCleanerConfig = LogCleaner.cleanerConfig(newConfig)
+    val numThreads = newCleanerConfig.numThreads
+    numThreads >= 1 && numThreads >= config.numThreads / 2 && numThreads <= 
config.numThreads * 2
+  }
+
+  /**
+    * Reconfigure log clean config. This simply stops current log cleaners and 
creates new ones.
+    * That ensures that if any of the cleaners had failed, new cleaners are 
created to match the new config.
+    */
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+    config = LogCleaner.cleanerConfig(newConfig)
+    shutdown()
+    startup()
   }
 
   /**
@@ -210,6 +240,12 @@ class LogCleaner(val config: CleanerConfig,
     isCleaned
   }
 
+  // Only for testing
+  private[kafka] def currentConfig: CleanerConfig = config
+
+  // Only for testing
+  private[log] def cleanerCount: Int = cleaners.size
+
   /**
    * The cleaner threads do the actual log cleaning. Each thread processes 
does its cleaning repeatedly by
    * choosing the dirtiest log, cleaning it, and then swapping in the cleaned 
segments.
@@ -317,6 +353,30 @@ class LogCleaner(val config: CleanerConfig,
   }
 }
 
+object LogCleaner {
+  val ReconfigurableConfigs = Set(
+    KafkaConfig.LogCleanerThreadsProp,
+    KafkaConfig.LogCleanerDedupeBufferSizeProp,
+    KafkaConfig.LogCleanerDedupeBufferLoadFactorProp,
+    KafkaConfig.LogCleanerIoBufferSizeProp,
+    KafkaConfig.MessageMaxBytesProp,
+    KafkaConfig.LogCleanerIoMaxBytesPerSecondProp,
+    KafkaConfig.LogCleanerBackoffMsProp
+  )
+
+  def cleanerConfig(config: KafkaConfig): CleanerConfig = {
+    CleanerConfig(numThreads = config.logCleanerThreads,
+      dedupeBufferSize = config.logCleanerDedupeBufferSize,
+      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
+      ioBufferSize = config.logCleanerIoBufferSize,
+      maxMessageSize = config.messageMaxBytes,
+      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
+      backOffMs = config.logCleanerBackoffMs,
+      enableCleaner = config.logCleanerEnable)
+
+  }
+}
+
 /**
  * This class holds the actual logic for cleaning a log
  * @param id An identifier used for logging
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index a7d106fb3ea..37a0be84fc1 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -893,18 +893,11 @@ object LogManager {
     val defaultProps = KafkaServer.copyKafkaConfigToLog(config)
     val defaultLogConfig = LogConfig(defaultProps)
 
+    // read the log configurations from zookeeper
     val (topicConfigs, failed) = 
zkClient.getLogConfigs(zkClient.getAllTopicsInCluster, defaultProps)
     if (!failed.isEmpty) throw failed.head._2
 
-    // read the log configurations from zookeeper
-    val cleanerConfig = CleanerConfig(numThreads = config.logCleanerThreads,
-      dedupeBufferSize = config.logCleanerDedupeBufferSize,
-      dedupeBufferLoadFactor = config.logCleanerDedupeBufferLoadFactor,
-      ioBufferSize = config.logCleanerIoBufferSize,
-      maxMessageSize = config.messageMaxBytes,
-      maxIoBytesPerSecond = config.logCleanerIoMaxBytesPerSecond,
-      backOffMs = config.logCleanerBackoffMs,
-      enableCleaner = config.logCleanerEnable)
+    val cleanerConfig = LogCleaner.cleanerConfig(config)
 
     new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile),
       initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile),
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index f307b8dddca..2c186d38fa0 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -22,6 +22,7 @@ import java.util
 import java.util.Properties
 import java.util.concurrent.locks.ReentrantReadWriteLock
 
+import kafka.log.LogCleaner
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.{CoreUtils, Logging}
 import kafka.zk.{AdminZkClient, KafkaZkClient}
@@ -77,6 +78,7 @@ object DynamicBrokerConfig {
 
   val AllDynamicConfigs = mutable.Set[String]()
   AllDynamicConfigs ++= DynamicSecurityConfigs
+  AllDynamicConfigs ++= LogCleaner.ReconfigurableConfigs
 
   private val PerBrokerConfigs = DynamicSecurityConfigs
 
@@ -115,6 +117,7 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
   private val dynamicDefaultConfigs = mutable.Map[String, String]()
   private val brokerId = kafkaConfig.brokerId
   private val reconfigurables = mutable.Buffer[Reconfigurable]()
+  private val brokerReconfigurables = mutable.Buffer[BrokerReconfigurable]()
   private val lock = new ReentrantReadWriteLock
   private var currentConfig = kafkaConfig
 
@@ -124,11 +127,21 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
     updateBrokerConfig(brokerId, 
adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString))
   }
 
+  def addReconfigurables(kafkaServer: KafkaServer): Unit = {
+    if (kafkaServer.logManager.cleaner != null)
+      addBrokerReconfigurable(kafkaServer.logManager.cleaner)
+  }
+
   def addReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
     
require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains))
     reconfigurables += reconfigurable
   }
 
+  def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
+    
require(reconfigurable.reconfigurableConfigs.forall(AllDynamicConfigs.contains))
+    brokerReconfigurables += reconfigurable
+  }
+
   def removeReconfigurable(reconfigurable: Reconfigurable): Unit = 
CoreUtils.inWriteLock(lock) {
     reconfigurables -= reconfigurable
   }
@@ -327,9 +340,15 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
             val oldValues = 
currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
             val newValues = 
newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
             val updatedKeys = updatedConfigs(newValues, oldValues).keySet
-            processReconfigurable(listenerReconfigurable, updatedKeys, 
newValues, customConfigs, validateOnly)
+            if 
(needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, 
updatedKeys))
+              processReconfigurable(listenerReconfigurable, newValues, 
customConfigs, validateOnly)
           case reconfigurable =>
-            processReconfigurable(reconfigurable, updatedMap.keySet, 
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+            if (needsReconfiguration(reconfigurable.reconfigurableConfigs, 
updatedMap.keySet))
+              processReconfigurable(reconfigurable, 
newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+        }
+        brokerReconfigurables.foreach { reconfigurable =>
+          if 
(needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, 
updatedMap.keySet))
+            processBrokerReconfigurable(reconfigurable, currentConfig, 
newConfig, validateOnly)
         }
         newConfig
       } catch {
@@ -343,18 +362,41 @@ class DynamicBrokerConfig(private val kafkaConfig: 
KafkaConfig) extends Logging
       currentConfig
   }
 
-  private def processReconfigurable(reconfigurable: Reconfigurable, 
updatedKeys: Set[String],
-                                    allNewConfigs: util.Map[String, _], 
newCustomConfigs: util.Map[String, Object],
+  private def needsReconfiguration(reconfigurableConfigs: util.Set[String], 
updatedKeys: Set[String]): Boolean = {
+    reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty
+  }
+
+  private def processReconfigurable(reconfigurable: Reconfigurable,
+                                    allNewConfigs: util.Map[String, _],
+                                    newCustomConfigs: util.Map[String, Object],
                                     validateOnly: Boolean): Unit = {
-    if 
(reconfigurable.reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty) {
-      val newConfigs = new util.HashMap[String, Object]
-      allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, 
v.asInstanceOf[AnyRef]) }
-      newConfigs.putAll(newCustomConfigs)
-      if (validateOnly) {
-        if (!reconfigurable.validateReconfiguration(newConfigs))
-          throw new ConfigException("Validation of dynamic config update 
failed")
-      } else
-        reconfigurable.reconfigure(newConfigs)
-    }
+    val newConfigs = new util.HashMap[String, Object]
+    allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, 
v.asInstanceOf[AnyRef]) }
+    newConfigs.putAll(newCustomConfigs)
+    if (validateOnly) {
+      if (!reconfigurable.validateReconfiguration(newConfigs))
+        throw new ConfigException("Validation of dynamic config update failed")
+    } else
+      reconfigurable.reconfigure(newConfigs)
   }
+
+  private def processBrokerReconfigurable(reconfigurable: BrokerReconfigurable,
+                                          oldConfig: KafkaConfig,
+                                          newConfig: KafkaConfig,
+                                          validateOnly: Boolean): Unit = {
+    if (validateOnly) {
+      if (!reconfigurable.validateReconfiguration(newConfig))
+        throw new ConfigException("Validation of dynamic config update failed")
+    } else
+      reconfigurable.reconfigure(oldConfig, newConfig)
+  }
+}
+
+trait BrokerReconfigurable {
+
+  def reconfigurableConfigs: Set[String]
+
+  def validateReconfiguration(newConfig: KafkaConfig): Boolean
+
+  def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
 }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 80b0eb73d0a..c4123f19749 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -290,6 +290,9 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
         Mx4jLoader.maybeLoad()
 
+        /* Add all reconfigurables for config change notification before 
starting config handlers */
+        config.dynamicConfig.addReconfigurables(this)
+
         /* start dynamic config manager */
         dynamicConfigHandlers = Map[String, ConfigHandler](ConfigType.Topic -> 
new TopicConfigHandler(logManager, config, quotaManagers),
                                                            ConfigType.Client 
-> new ClientIdConfigHandler(quotaManagers),
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index a760d7d1d06..c6f023ff8d0 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -242,6 +242,46 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     stopAndVerifyProduceConsume(producerThread, consumerThread, 
mayFailRequests = false)
   }
 
+  @Test
+  def testLogCleanerConfig(): Unit = {
+    val (producerThread, consumerThread) = startProduceConsume(0)
+
+    verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 1)
+
+    val props = new Properties
+    props.put(KafkaConfig.LogCleanerThreadsProp, "2")
+    props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, "20000000")
+    props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, "0.8")
+    props.put(KafkaConfig.LogCleanerIoBufferSizeProp, "300000")
+    props.put(KafkaConfig.MessageMaxBytesProp, "40000")
+    props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, "50000000")
+    props.put(KafkaConfig.LogCleanerBackoffMsProp, "6000")
+    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogCleanerThreadsProp, "2"))
+
+    // Verify cleaner config was updated
+    val newCleanerConfig = servers.head.logManager.cleaner.currentConfig
+    assertEquals(2, newCleanerConfig.numThreads)
+    assertEquals(20000000, newCleanerConfig.dedupeBufferSize)
+    assertEquals(0.8, newCleanerConfig.dedupeBufferLoadFactor, 0.001)
+    assertEquals(300000, newCleanerConfig.ioBufferSize)
+    assertEquals(40000, newCleanerConfig.maxMessageSize)
+    assertEquals(50000000, newCleanerConfig.maxIoBytesPerSecond, 50000000)
+    assertEquals(6000, newCleanerConfig.backOffMs)
+
+    // Verify thread count
+    verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
+
+    // Stop a couple of threads and verify they are recreated if any config is 
updated
+    def cleanerThreads = 
Thread.getAllStackTraces.keySet.asScala.filter(_.getName.startsWith("kafka-log-cleaner-thread-"))
+    cleanerThreads.take(2).foreach(_.interrupt())
+    TestUtils.waitUntilTrue(() => cleanerThreads.size == (2 * numServers) - 2, 
"Threads did not exit")
+    props.put(KafkaConfig.LogCleanerBackoffMsProp, "8000")
+    reconfigureServers(props, perBrokerConfig = false, 
(KafkaConfig.LogCleanerBackoffMsProp, "8000"))
+    verifyThreads("kafka-log-cleaner-thread-", countPerBroker = 2)
+
+    stopAndVerifyProduceConsume(producerThread, consumerThread, 
mayFailRequests = false)
+  }
+
   private def createProducer(trustStore: File, retries: Int,
                              clientId: String = "test-producer"): 
KafkaProducer[String, String] = {
     val bootstrapServers = TestUtils.bootstrapServers(servers, new 
ListenerName(SecureExternal))
@@ -411,6 +451,19 @@ class DynamicBrokerReconfigurationTest extends 
ZooKeeperTestHarness with SaslSet
     props
   }
 
+  private def currentThreads: List[String] = {
+    Thread.getAllStackTraces.keySet.asScala.toList.map(_.getName)
+  }
+
+  private def verifyThreads(threadPrefix: String, countPerBroker: Int): Unit = 
{
+    val expectedCount = countPerBroker * servers.size
+    val (threads, resized) = 
TestUtils.computeUntilTrue(currentThreads.filter(_.startsWith(threadPrefix))) {
+      _.size == expectedCount
+    }
+    assertTrue(s"Invalid threads: expected $expectedCount, got 
${threads.size}: $threads", resized)
+  }
+
+
   private def startProduceConsume(retries: Int): (ProducerThread, 
ConsumerThread) = {
     val producerThread = new ProducerThread(retries)
     clientThreads += producerThread
diff --git 
a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
index bff27006e28..0ad5b46de2c 100644
--- a/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/AbstractLogCleanerIntegrationTest.scala
@@ -79,6 +79,7 @@ abstract class AbstractLogCleanerIntegrationTest {
                   compactionLag: Long = defaultCompactionLag,
                   deleteDelay: Int = defaultDeleteDelay,
                   segmentSize: Int = defaultSegmentSize,
+                  cleanerIoBufferSize: Option[Int] = None,
                   propertyOverrides: Properties = new Properties()): 
LogCleaner = {
 
     val logMap = new Pool[TopicPartition, Log]()
@@ -108,7 +109,7 @@ abstract class AbstractLogCleanerIntegrationTest {
 
     val cleanerConfig = CleanerConfig(
       numThreads = numThreads,
-      ioBufferSize = maxMessageSize / 2,
+      ioBufferSize = cleanerIoBufferSize.getOrElse(maxMessageSize / 2),
       maxMessageSize = maxMessageSize,
       backOffMs = backOffMs)
     new LogCleaner(cleanerConfig,
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
index b20622f99ab..22d7e77fa8d 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
@@ -18,10 +18,12 @@
 package kafka.log
 
 import java.io.File
+import java.util
 import java.util.Properties
 
 import kafka.api.KAFKA_0_11_0_IV0
 import kafka.api.{KAFKA_0_10_0_IV1, KAFKA_0_9_0}
+import kafka.server.KafkaConfig
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -227,6 +229,56 @@ class LogCleanerIntegrationTest(compressionCodec: String) 
extends AbstractLogCle
     checkLogAfterAppendingDups(log, startSize, appends)
   }
 
+  @Test
+  def cleanerConfigUpdateTest() {
+    val largeMessageKey = 20
+    val (largeMessageValue, largeMessageSet) = 
createLargeSingleMessageSet(largeMessageKey, RecordBatch.CURRENT_MAGIC_VALUE)
+    val maxMessageSize = largeMessageSet.sizeInBytes
+
+    cleaner = makeCleaner(partitions = topicPartitions, backOffMs = 1, 
maxMessageSize = maxMessageSize,
+      cleanerIoBufferSize = Some(1))
+    val log = cleaner.logs.get(topicPartitions(0))
+
+    val appends = writeDups(numKeys = 100, numDups = 3, log = log, codec = 
codec)
+    val startSize = log.size
+    cleaner.startup()
+    assertEquals(1, cleaner.cleanerCount)
+
+    // Verify no cleaning with LogCleanerIoBufferSizeProp=1
+    val firstDirty = log.activeSegment.baseOffset
+    val topicPartition = new TopicPartition("log", 0)
+    cleaner.awaitCleaned(topicPartition, firstDirty, maxWaitMs = 10)
+    assertTrue("Should not have cleaned", 
cleaner.cleanerManager.allCleanerCheckpoints.isEmpty)
+
+    def kafkaConfigWithCleanerConfig(cleanerConfig: CleanerConfig): 
KafkaConfig = {
+      val props = TestUtils.createBrokerConfig(0, "localhost:2181")
+      props.put(KafkaConfig.LogCleanerThreadsProp, 
cleanerConfig.numThreads.toString)
+      props.put(KafkaConfig.LogCleanerDedupeBufferSizeProp, 
cleanerConfig.dedupeBufferSize.toString)
+      props.put(KafkaConfig.LogCleanerDedupeBufferLoadFactorProp, 
cleanerConfig.dedupeBufferLoadFactor.toString)
+      props.put(KafkaConfig.LogCleanerIoBufferSizeProp, 
cleanerConfig.ioBufferSize.toString)
+      props.put(KafkaConfig.MessageMaxBytesProp, 
cleanerConfig.maxMessageSize.toString)
+      props.put(KafkaConfig.LogCleanerBackoffMsProp, 
cleanerConfig.backOffMs.toString)
+      props.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 
cleanerConfig.maxIoBytesPerSecond.toString)
+      KafkaConfig.fromProps(props)
+    }
+
+    // Verify cleaning done with larger LogCleanerIoBufferSizeProp
+    val oldConfig = kafkaConfigWithCleanerConfig(cleaner.currentConfig)
+    val newConfig = kafkaConfigWithCleanerConfig(CleanerConfig(numThreads = 2,
+      dedupeBufferSize = cleaner.currentConfig.dedupeBufferSize,
+      dedupeBufferLoadFactor = cleaner.currentConfig.dedupeBufferLoadFactor,
+      ioBufferSize = 100000,
+      maxMessageSize = cleaner.currentConfig.maxMessageSize,
+      maxIoBytesPerSecond = cleaner.currentConfig.maxIoBytesPerSecond,
+      backOffMs = cleaner.currentConfig.backOffMs))
+    cleaner.reconfigure(oldConfig, newConfig)
+
+    assertEquals(2, cleaner.cleanerCount)
+    checkLastCleaned("log", 0, firstDirty)
+    val compactedSize = log.logSegments.map(_.size).sum
+    assertTrue(s"log should have been compacted: startSize=$startSize 
compactedSize=$compactedSize", startSize > compactedSize)
+  }
+
   private def checkLastCleaned(topic: String, partitionId: Int, firstDirty: 
Long) {
     // wait until cleaning up to base_offset, note that cleaning happens only 
when "log dirty ratio" is higher than
     // LogConfig.MinCleanableDirtyRatioProp
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 20320116b23..6dedbe00d7a 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -104,23 +104,24 @@ class DynamicBrokerConfigTest {
     verifyConfigUpdateWithInvalidConfig(validProps, 
securityPropsWithoutListenerPrefix)
     val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
     verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
+
+    val invalidProps = Map(KafkaConfig.LogCleanerThreadsProp -> "invalid")
+    verifyConfigUpdateWithInvalidConfig(validProps, invalidProps)
   }
 
   @Test
   def testSecurityConfigs(): Unit = {
-    def verifyUpdate(name: String, value: Object, invalidValue: Boolean): Unit 
= {
+    def verifyUpdate(name: String, value: Object): Unit = {
       verifyConfigUpdate(name, value, perBrokerConfig = true, expectFailure = 
true)
-      verifyConfigUpdate(s"listener.name.external.$name", value, 
perBrokerConfig = true, expectFailure = invalidValue)
+      verifyConfigUpdate(s"listener.name.external.$name", value, 
perBrokerConfig = true, expectFailure = false)
       verifyConfigUpdate(name, value, perBrokerConfig = false, expectFailure = 
true)
       verifyConfigUpdate(s"listener.name.external.$name", value, 
perBrokerConfig = false, expectFailure = true)
     }
 
-    verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks", 
invalidValue = false)
-    verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS", invalidValue = 
false)
-    verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password", 
invalidValue = false)
-    verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password", invalidValue 
= false)
-    verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 
1.asInstanceOf[Integer], invalidValue = true)
-    verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 1.asInstanceOf[Integer], 
invalidValue = true)
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks")
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password")
+    verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password")
   }
 
   private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: 
Boolean, expectFailure: Boolean) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Enable dynamic reconfiguration of log cleaners
> ----------------------------------------------
>
>                 Key: KAFKA-6244
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6244
>             Project: Kafka
>          Issue Type: Sub-task
>          Components: core
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.1.0
>
>
> See 
> [KIP-226|https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration]
>  for details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to