satishd commented on code in PR #17859:
URL: https://github.com/apache/kafka/pull/17859#discussion_r1849549295
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -2151,31 +2167,30 @@ RLMTaskWithFuture followerTask(TopicIdPartition
partition) {
static class RLMScheduledThreadPool {
private static final Logger LOGGER =
LoggerFactory.getLogger(RLMScheduledThreadPool.class);
- private final int poolSize;
private final String threadPoolName;
- private final String threadNamePrefix;
+ private final String threadNamePattern;
private final ScheduledThreadPoolExecutor scheduledThreadPool;
- public RLMScheduledThreadPool(int poolSize, String threadPoolName,
String threadNamePrefix) {
- this.poolSize = poolSize;
+ public RLMScheduledThreadPool(int poolSize, String threadPoolName,
String threadNamePattern) {
this.threadPoolName = threadPoolName;
- this.threadNamePrefix = threadNamePrefix;
- scheduledThreadPool = createPool();
+ this.threadNamePattern = threadNamePattern;
+ scheduledThreadPool = createPool(poolSize);
+ }
+
+ public void setCorePoolSize(int newSize) {
+ scheduledThreadPool.setCorePoolSize(newSize);
+ }
+
+ public int getCorePoolSize() {
+ return scheduledThreadPool.getCorePoolSize();
}
- private ScheduledThreadPoolExecutor createPool() {
+ private ScheduledThreadPoolExecutor createPool(int poolSize) {
ScheduledThreadPoolExecutor threadPool = new
ScheduledThreadPoolExecutor(poolSize);
threadPool.setRemoveOnCancelPolicy(true);
threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- threadPool.setThreadFactory(new ThreadFactory() {
- private final AtomicInteger sequence = new AtomicInteger();
-
- public Thread newThread(Runnable r) {
- return KafkaThread.daemon(threadNamePrefix +
sequence.incrementAndGet(), r);
- }
- });
-
+
threadPool.setThreadFactory(ThreadUtils.createThreadFactory(threadNamePattern,
true));
Review Comment:
This is a good cleanup.
##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -1167,6 +1167,22 @@ class DynamicRemoteLogConfig(server: KafkaBroker)
extends BrokerReconfigurable w
throw new ConfigException(s"$errorMsg, value should be at least 1")
}
}
+
+ if
(RemoteLogManagerConfig.REMOTE_LOG_MANAGER_COPIER_THREAD_POOL_SIZE_PROP.equals(k)
||
+
RemoteLogManagerConfig.REMOTE_LOG_MANAGER_EXPIRATION_THREAD_POOL_SIZE_PROP.equals(k)
||
+ RemoteLogManagerConfig.REMOTE_LOG_READER_THREADS_PROP.equals(k)) {
+ val newValue = v.asInstanceOf[Int]
+ val oldValue = server.config.getInt(k)
+ if (newValue != oldValue) {
+ val errorMsg = s"Dynamic thread count update validation failed for
$k=$v"
+ if (newValue <= 0)
Review Comment:
The configuration can be updated to its default value of -1 through a static
configuration update.
Is this an intended change that does not allow updating it to the default
value (-1) through dynamic configs?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]