satishd commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1160112094


##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -17,39 +17,97 @@
 package kafka.log.remote
 
 import kafka.cluster.Partition
+import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common._
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
-import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, 
Time, Utils}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer
 import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
 import org.apache.kafka.server.log.remote.storage._
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, 
LeaderEpochCheckpointFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
+import org.apache.kafka.storage.internals.log.EpochEntry
 
-import java.io.{Closeable, InputStream}
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
 import java.security.{AccessController, PrivilegedAction}
-import java.util
 import java.util.Optional
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.{lang, util}
+import scala.collection.Searching._
 import scala.collection.Set
 import scala.jdk.CollectionConverters._
 
+class RLMScheduledThreadPool(poolSize: Int) extends Logging {
+
+  private val scheduledThreadPool: ScheduledThreadPoolExecutor = {
+    val threadPool = new ScheduledThreadPoolExecutor(poolSize)
+    threadPool.setRemoveOnCancelPolicy(true)
+    threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+    threadPool.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+    threadPool.setThreadFactory(new ThreadFactory {
+      private val sequence = new AtomicInteger()
+
+      override def newThread(r: Runnable): Thread = {
+        KafkaThread.daemon("kafka-rlm-thread-pool-" + 
sequence.incrementAndGet(), r)
+      }
+    })
+
+    threadPool
+  }
+
+  def resizePool(size: Int): Unit = {
+    info(s"Resizing pool from ${scheduledThreadPool.getCorePoolSize} to $size")
+    scheduledThreadPool.setCorePoolSize(size)
+  }
+
+  def poolSize(): Int = scheduledThreadPool.getCorePoolSize
+
+  def getIdlePercent(): Double = {
+    1 - scheduledThreadPool.getActiveCount().asInstanceOf[Double] / 
scheduledThreadPool.getCorePoolSize.asInstanceOf[Double]
+  }
+
+  def scheduleWithFixedDelay(runnable: Runnable, initialDelay: Long, delay: 
Long,
+                             timeUnit: TimeUnit): ScheduledFuture[_] = {
+    info(s"Scheduling runnable $runnable with initial delay: $initialDelay, 
fixed delay: $delay")
+    scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, 
timeUnit)
+  }
+
+  def shutdown(): Boolean = {
+    info("Shutting down scheduled thread pool")
+    scheduledThreadPool.shutdownNow()
+    //waits for 2 mins to terminate the current tasks
+    scheduledThreadPool.awaitTermination(2, TimeUnit.MINUTES)
+  }
+}
+
 /**
  * This class is responsible for
- *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances.
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances
  *  - receives any leader and follower replica events and partition stop 
events and act on them
- *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *  - also provides APIs to fetch indexes, metadata about remote log segments
+ *  - copying log segments to remote storage
  *
  * @param rlmConfig Configuration required for remote logging subsystem(tiered 
storage) at the broker level.
  * @param brokerId  id of the current broker.
  * @param logDir    directory of Kafka log segments.
+ * @param time      Time instance.

Review Comment:
   This was an existing code in Scala in my other branch. But it avoids the 
review again during Java migration later. I did the Java migration as you 
suggested and pushed it to the PR. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to