kamalcph commented on code in PR #17793:
URL: https://github.com/apache/kafka/pull/17793#discussion_r1845012900


##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -771,7 +772,10 @@ object DynamicThreadPool {
     ServerConfigs.NUM_IO_THREADS_CONFIG,
     ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG,
     ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG,
-    ServerConfigs.BACKGROUND_THREADS_CONFIG)
+    ServerConfigs.BACKGROUND_THREADS_CONFIG, 
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP)

Review Comment:
   We also have to update the BrokerDynamicThreadPool.reconfigurableConfigs() 
method to return only the below configs:
   
   1. ServerConfigs.NUM_IO_THREADS_CONFIG
   2. ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG
   3. ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG and
   4. ServerConfigs.BACKGROUND_THREADS_CONFIG
   
   Otherwise for the newly added RemoteLogManager configs, it gets double 
registered as brokerReconfigurable in both BrokerDynamicThreadPool and  
RemoteLogDynamicThreadPool.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -2163,6 +2178,10 @@ public RLMScheduledThreadPool(int poolSize, String 
threadPoolName, String thread
             scheduledThreadPool = createPool();
         }
 
+        public void resize(int newSize) {

Review Comment:
   nit: Can we rename this method to `setCorePoolSize`? Also, add one getter 
method `getPoolSize`



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java:
##########
@@ -61,6 +61,10 @@ protected void afterExecute(Runnable runnable, Throwable th) 
{
         }
     }
 
+    public void resize(int newSize) {

Review Comment:
   this method is not required, we can directly call `setCorePoolSize`.



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -840,6 +847,43 @@ class BrokerDynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable
   }
 }
 
+
+class RemoteLogDynamicThreadPool(server: KafkaBroker) extends 
BrokerReconfigurable with Logging {
+  override def reconfigurableConfigs: Set[String] = Set(
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP,
+    RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP)
+
+  override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+    DynamicThreadPool.validateReconfiguration(server.config, newConfig)
+  }
+
+  override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): 
Unit = {
+    val remoteLogManager = server.remoteLogManagerOpt

Review Comment:
   Also, please add one if check as remoteLogManger is optional:
   
   ```suggestion
      if (server.remoteLogManagerOpt.nonEmpty) {
       val remoteLogManager = server.remoteLogManagerOpt.get
       ...
       ...
   ```



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -289,6 +289,21 @@ public void updateFetchQuota(long quota) {
         rlmFetchQuotaManager.updateQuota(new Quota(quota, true));
     }
 
+    public void resizeCopierThreadPool(int size) {
+        LOGGER.info("Updating remote copy thread pool size to {}", size);
+        rlmCopyThreadPool.resize(size);
+    }

Review Comment:
   ```suggestion
       public void resizeCopierThreadPool(int newSize) {
           int currentSize = rlmCopyThreadPool.getPoolSize();
           LOGGER.info("Updating remote copy thread pool size from {} to {}", 
currentSize, newSize);
           rlmCopyThreadPool.resize(newSize);
      }
   ```



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -797,6 +801,9 @@ object DynamicThreadPool {
       case ReplicationConfigs.NUM_REPLICA_FETCHERS_CONFIG => 
config.numReplicaFetchers
       case ServerLogConfigs.NUM_RECOVERY_THREADS_PER_DATA_DIR_CONFIG => 
config.numRecoveryThreadsPerDataDir
       case ServerConfigs.BACKGROUND_THREADS_CONFIG => config.backgroundThreads
+      case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => 
config.remoteLogCopierThreads

Review Comment:
   Get the copier thread size using the below way, we don't want to expose the 
remoteLogManager configs in KafkaConfig class:
   
   ```suggestion
         case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP => 
config.remoteLogManagerConfig.remoteLogManagerCopierThreadPoolSize() 
   ```
   
   same for other configs



##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1114,6 +1114,9 @@ class KafkaConfigTest {
         case RemoteLogManagerConfig.REMOTE_LOG_READER_MAX_PENDING_TASKS_PROP 
=> assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
         case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
         case RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", -3)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPY_MAX_BYTES_PER_SECOND_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_FETCH_MAX_BYTES_PER_SECOND_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)
+        case 
RemoteLogManagerConfig.REMOTE_LIST_OFFSETS_REQUEST_TIMEOUT_MS_PROP => 
assertPropertyInvalid(baseProperties, name, "not_a_number", 0, -1)

Review Comment:
   this is not required.



##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -380,6 +380,9 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
 
   def backgroundThreads = getInt(ServerConfigs.BACKGROUND_THREADS_CONFIG)
   def numIoThreads = getInt(ServerConfigs.NUM_IO_THREADS_CONFIG)
+  def remoteLogCopierThreads = 
getInt(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP)
+  def remoteLogExpirationThreads = 
getInt(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP)
+  def remoteLogReaderThreads = 
getInt(RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP)

Review Comment:
   These getters are not required, please see above comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to