satishd commented on code in PR #13487: URL: https://github.com/apache/kafka/pull/13487#discussion_r1160112094
########## core/src/main/scala/kafka/log/remote/RemoteLogManager.scala: ########## @@ -17,39 +17,97 @@ package kafka.log.remote import kafka.cluster.Partition +import kafka.log.UnifiedLog import kafka.server.KafkaConfig import kafka.utils.Logging import org.apache.kafka.common._ import org.apache.kafka.common.record.FileRecords.TimestampAndOffset import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream} -import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils} +import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, Time, Utils} +import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager import org.apache.kafka.server.log.remote.storage._ +import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, LeaderEpochCheckpointFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache +import org.apache.kafka.storage.internals.log.EpochEntry -import java.io.{Closeable, InputStream} +import java.io._ +import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.nio.file.Path import java.security.{AccessController, PrivilegedAction} -import java.util import java.util.Optional -import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap} +import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicInteger +import java.{lang, util} +import scala.collection.Searching._ import scala.collection.Set import scala.jdk.CollectionConverters._ +class RLMScheduledThreadPool(poolSize: Int) extends Logging { + + private val scheduledThreadPool: ScheduledThreadPoolExecutor = { + val threadPool = new ScheduledThreadPoolExecutor(poolSize) + threadPool.setRemoveOnCancelPolicy(true) + threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false) + threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false) + threadPool.setThreadFactory(new ThreadFactory { + private val sequence = new AtomicInteger() + + override def newThread(r: Runnable): Thread = { + KafkaThread.daemon("kafka-rlm-thread-pool-" + sequence.incrementAndGet(), r) + } + }) + + threadPool + } + + def resizePool(size: Int): Unit = { + info(s"Resizing pool from ${scheduledThreadPool.getCorePoolSize} to $size") + scheduledThreadPool.setCorePoolSize(size) + } + + def poolSize(): Int = scheduledThreadPool.getCorePoolSize + + def getIdlePercent(): Double = { + 1 - scheduledThreadPool.getActiveCount().asInstanceOf[Double] / scheduledThreadPool.getCorePoolSize.asInstanceOf[Double] + } + + def scheduleWithFixedDelay(runnable: Runnable, initialDelay: Long, delay: Long, + timeUnit: TimeUnit): ScheduledFuture[_] = { + info(s"Scheduling runnable $runnable with initial delay: $initialDelay, fixed delay: $delay") + scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit) + } + + def shutdown(): Boolean = { + info("Shutting down scheduled thread pool") + scheduledThreadPool.shutdownNow() + //waits for 2 mins to terminate the current tasks + scheduledThreadPool.awaitTermination(2, TimeUnit.MINUTES) + } +} + /** * This class is responsible for - * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances. + * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances * - receives any leader and follower replica events and partition stop events and act on them - * - also provides APIs to fetch indexes, metadata about remote log segments. + * - also provides APIs to fetch indexes, metadata about remote log segments + * - copying log segments to remote storage * * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level. * @param brokerId id of the current broker. * @param logDir directory of Kafka log segments. + * @param time Time instance. Review Comment: This was an existing code in Scala in my other branch. But it avoids the review again during Java migration later. I did the Java migration as you suggested and pushed it to the PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org