satishd commented on code in PR #17859:
URL: https://github.com/apache/kafka/pull/17859#discussion_r1883445832
##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -2152,31 +2168,30 @@ RLMTaskWithFuture followerTask(TopicIdPartition
partition) {
static class RLMScheduledThreadPool {
private static final Logger LOGGER =
LoggerFactory.getLogger(RLMScheduledThreadPool.class);
- private final int poolSize;
private final String threadPoolName;
- private final String threadNamePrefix;
+ private final String threadNamePattern;
private final ScheduledThreadPoolExecutor scheduledThreadPool;
- public RLMScheduledThreadPool(int poolSize, String threadPoolName,
String threadNamePrefix) {
- this.poolSize = poolSize;
+ public RLMScheduledThreadPool(int poolSize, String threadPoolName,
String threadNamePattern) {
this.threadPoolName = threadPoolName;
- this.threadNamePrefix = threadNamePrefix;
- scheduledThreadPool = createPool();
+ this.threadNamePattern = threadNamePattern;
+ scheduledThreadPool = createPool(poolSize);
+ }
+
+ public void setCorePoolSize(int newSize) {
+ scheduledThreadPool.setCorePoolSize(newSize);
+ }
+
+ public int getCorePoolSize() {
+ return scheduledThreadPool.getCorePoolSize();
}
- private ScheduledThreadPoolExecutor createPool() {
+ private ScheduledThreadPoolExecutor createPool(int poolSize) {
ScheduledThreadPoolExecutor threadPool = new
ScheduledThreadPoolExecutor(poolSize);
threadPool.setRemoveOnCancelPolicy(true);
threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
- threadPool.setThreadFactory(new ThreadFactory() {
- private final AtomicInteger sequence = new AtomicInteger();
-
- public Thread newThread(Runnable r) {
- return KafkaThread.daemon(threadNamePrefix +
sequence.incrementAndGet(), r);
- }
- });
-
+
threadPool.setThreadFactory(ThreadUtils.createThreadFactory(threadNamePattern,
true));
Review Comment:
One minor difference is that `KafkaThread` sets the
`UncaughtExceptionHandler` like below:
```
setUncaughtExceptionHandler((t, e) -> log.error("Uncaught exception in
thread '{}':", name, e));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]