showuon commented on code in PR #12296: URL: https://github.com/apache/kafka/pull/12296#discussion_r913815025
########## core/src/main/scala/kafka/log/LogCleaner.scala: ########## @@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig, } /** - * Reconfigure log clean config. This simply stops current log cleaners and creates new ones. + * Reconfigure log clean config. This updates desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond and stops current log cleaners and creates new ones. Review Comment: There are 2 `and` in the sentence now. Maybe we can put it like this to make it clear: ``` It will (1) updates desiredRatePerSec in Throttler with logCleanerIoMaxBytesPerSecond if necessary (2) stops current log cleaners and creates new ones. ``` WDYT? ########## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ########## @@ -1854,6 +1853,26 @@ class LogCleanerTest { } finally logCleaner.shutdown() } + @Test + def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { + val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") + oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000) + + val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + + assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp") + + val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") + newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000) + + logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new KafkaConfig(newKafkaProps)) + + assertEquals(logCleaner.throttler.desiredRatePerSec, 20000000, "Throttler.desiredRatePerSec should be updated with new KafkaConfig.LogCleanerIoMaxBytesPerSecondProp") Review Comment: The error message might be able to update to: ``` Throttler.desiredRatePerSec should be updated with new `cleaner.io.max.bytes.per.second` config. ``` ########## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ########## @@ -1854,6 +1853,26 @@ class LogCleanerTest { } finally logCleaner.shutdown() } + @Test + def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { + val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") + oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000) + + val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + + assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp") Review Comment: nit: the 1st parameter of `assertEquals` should be expected value, and 2nd one is actual value. Same comment applied to below assertion. ########## core/src/main/scala/kafka/utils/Throttler.scala: ########## @@ -36,7 +36,7 @@ import scala.math._ * @param time: The time implementation to use */ @threadsafe -class Throttler(desiredRatePerSec: Double, +class Throttler(var desiredRatePerSec: Double, Review Comment: Thanks for pointing it out. Yes, it sleeps inside the lock. At first, I thought it is a but, but after further thought, I think it is intended because there might be multiple threads try to call `maybeThrottle` method. We'd like to calculate the throttle time sequentially (I guess). This code is written since 2011, so I think we just keep it as is. In that case, I agree `volatile` is a better solution. Otherwise, the reconfigure change might not be able to take effect immediately. @tombentley , WDYT? ########## core/src/test/scala/unit/kafka/log/LogCleanerTest.scala: ########## @@ -1854,6 +1853,26 @@ class LogCleanerTest { } finally logCleaner.shutdown() } + @Test + def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = { + val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181") + oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000) + + val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new KafkaConfig(oldKafkaProps)), + logDirs = Array(TestUtils.tempDir()), + logs = new Pool[TopicPartition, UnifiedLog](), + logDirFailureChannel = new LogDirFailureChannel(1), + time = time) + + assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, "Throttler.desiredRatePerSec should be initialized with KafkaConfig.LogCleanerIoMaxBytesPerSecondProp") Review Comment: Also, the error message might be able to update to: ``` Throttler.desiredRatePerSec should be initialized from initial `cleaner.io.max.bytes.per.second` config. ``` ########## core/src/main/scala/kafka/log/LogCleaner.scala: ########## @@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig, */ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { config = LogCleaner.cleanerConfig(newConfig) + + val maxIoBytesPerSecond = config.maxIoBytesPerSecond; + if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) { + info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond") + throttler.updateDesiredRatePerSec(maxIoBytesPerSecond) + } Review Comment: Test is good. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org