showuon commented on code in PR #12296:
URL: https://github.com/apache/kafka/pull/12296#discussion_r913815025


##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -186,11 +186,18 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-    * Reconfigure log clean config. This simply stops current log cleaners and 
creates new ones.
+    * Reconfigure log clean config. This updates desiredRatePerSec in 
Throttler with logCleanerIoMaxBytesPerSecond and stops current log cleaners and 
creates new ones.

Review Comment:
   There are 2 `and` in the sentence now. Maybe we can put it like this to make 
it clear:
   ```
   It will (1) updates desiredRatePerSec in Throttler with 
logCleanerIoMaxBytesPerSecond if necessary (2) stops current log cleaners and 
creates new ones.
   ```
   WDYT?



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, 
"Throttler.desiredRatePerSec should be initialized with 
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")
+
+    val newKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    newKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 20000000)
+
+    logCleaner.reconfigure(new KafkaConfig(oldKafkaProps), new 
KafkaConfig(newKafkaProps))
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 20000000, 
"Throttler.desiredRatePerSec should be updated with new 
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   The error message might be able to update to:
   ```
   Throttler.desiredRatePerSec should be updated with new 
`cleaner.io.max.bytes.per.second` config.
   ```



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, 
"Throttler.desiredRatePerSec should be initialized with 
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   nit: the 1st parameter of `assertEquals` should be expected value, and 2nd 
one is actual value. Same comment applied to below assertion.



##########
core/src/main/scala/kafka/utils/Throttler.scala:
##########
@@ -36,7 +36,7 @@ import scala.math._
  * @param time: The time implementation to use
  */
 @threadsafe
-class Throttler(desiredRatePerSec: Double,
+class Throttler(var desiredRatePerSec: Double,

Review Comment:
   Thanks for pointing it out. Yes, it sleeps inside the lock. At first, I 
thought it is a but, but after further thought, I think it is intended because 
there might be multiple threads try to call `maybeThrottle` method. We'd like 
to calculate the throttle time sequentially (I guess). This code is written 
since 2011, so I think we just keep it as is. 
   In that case, I agree `volatile` is a better solution. Otherwise, the 
reconfigure change might not be able to take effect immediately.
   
   @tombentley , WDYT?



##########
core/src/test/scala/unit/kafka/log/LogCleanerTest.scala:
##########
@@ -1854,6 +1853,26 @@ class LogCleanerTest {
     } finally logCleaner.shutdown()
   }
 
+  @Test
+  def testReconfigureLogCleanerIoMaxBytesPerSecond(): Unit = {
+    val oldKafkaProps = TestUtils.createBrokerConfig(1, "localhost:2181")
+    oldKafkaProps.put(KafkaConfig.LogCleanerIoMaxBytesPerSecondProp, 10000000)
+
+    val logCleaner = new LogCleaner(LogCleaner.cleanerConfig(new 
KafkaConfig(oldKafkaProps)),
+      logDirs = Array(TestUtils.tempDir()),
+      logs = new Pool[TopicPartition, UnifiedLog](),
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      time = time)
+
+    assertEquals(logCleaner.throttler.desiredRatePerSec, 10000000, 
"Throttler.desiredRatePerSec should be initialized with 
KafkaConfig.LogCleanerIoMaxBytesPerSecondProp")

Review Comment:
   Also, the error message might be able to update to:
   ```
   Throttler.desiredRatePerSec should be initialized from initial 
`cleaner.io.max.bytes.per.second` config.
   ```



##########
core/src/main/scala/kafka/log/LogCleaner.scala:
##########
@@ -191,6 +191,13 @@ class LogCleaner(initialConfig: CleanerConfig,
     */
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
     config = LogCleaner.cleanerConfig(newConfig)
+
+    val maxIoBytesPerSecond = config.maxIoBytesPerSecond;
+    if (maxIoBytesPerSecond != oldConfig.logCleanerIoMaxBytesPerSecond) {
+      info(s"Updating logCleanerIoMaxBytesPerSecond: $maxIoBytesPerSecond")
+      throttler.updateDesiredRatePerSec(maxIoBytesPerSecond)
+    }

Review Comment:
   Test is good. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to