showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1156860415


##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -17,39 +17,107 @@
 package kafka.log.remote
 
 import kafka.cluster.Partition
+import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
-import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
-import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, 
Time, Utils}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer
+import 
org.apache.kafka.server.log.remote.metadata.storage.{ClassLoaderAwareRemoteLogMetadataManager,
 TopicBasedRemoteLogMetadataManagerConfig}
 import org.apache.kafka.server.log.remote.storage._
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, 
LeaderEpochCheckpointFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
+import org.apache.kafka.storage.internals.log.EpochEntry
 
-import java.io.{Closeable, InputStream}
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
 import java.security.{AccessController, PrivilegedAction}
-import java.util
 import java.util.Optional
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.{lang, util}
+import scala.collection.Searching._
 import scala.collection.Set
 import scala.jdk.CollectionConverters._
 
+class RLMScheduledThreadPool(poolSize: Int) extends Logging {
+
+  private val scheduledThreadPool: ScheduledThreadPoolExecutor = {
+    val threadPool = new ScheduledThreadPoolExecutor(poolSize)
+    threadPool.setRemoveOnCancelPolicy(true)
+    threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)

Review Comment:
   should we set `setContinueExistingPeriodicTasksAfterShutdownPolicy(false);` ?



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -17,39 +17,107 @@
 package kafka.log.remote
 
 import kafka.cluster.Partition
+import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
-import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
-import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, 
Time, Utils}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer
+import 
org.apache.kafka.server.log.remote.metadata.storage.{ClassLoaderAwareRemoteLogMetadataManager,
 TopicBasedRemoteLogMetadataManagerConfig}
 import org.apache.kafka.server.log.remote.storage._
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, 
LeaderEpochCheckpointFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
+import org.apache.kafka.storage.internals.log.EpochEntry
 
-import java.io.{Closeable, InputStream}
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
 import java.security.{AccessController, PrivilegedAction}
-import java.util
 import java.util.Optional
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.{lang, util}
+import scala.collection.Searching._
 import scala.collection.Set
 import scala.jdk.CollectionConverters._
 
+class RLMScheduledThreadPool(poolSize: Int) extends Logging {
+
+  private val scheduledThreadPool: ScheduledThreadPoolExecutor = {
+    val threadPool = new ScheduledThreadPoolExecutor(poolSize)
+    threadPool.setRemoveOnCancelPolicy(true)
+    threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+    threadPool.setThreadFactory(new ThreadFactory {
+      private val sequence = new AtomicInteger()
+
+      override def newThread(r: Runnable): Thread = {
+        KafkaThread.daemon("kafka-rlm-thread-pool-" + 
sequence.incrementAndGet(), r)
+      }
+    })
+
+    threadPool
+  }
+
+  def resizePool(size: Int): Unit = {
+    info(s"Resizing pool from ${scheduledThreadPool.getCorePoolSize} to $size")
+    scheduledThreadPool.setCorePoolSize(size)
+  }
+
+  def poolSize(): Int = scheduledThreadPool.getCorePoolSize
+
+  def getIdlePercent(): Double = {
+    1 - scheduledThreadPool.getActiveCount().asInstanceOf[Double] / 
scheduledThreadPool.getCorePoolSize.asInstanceOf[Double]
+  }
+
+  def scheduleWithFixedDelay(runnable: Runnable, initialDelay: Long, delay: 
Long,
+                             timeUnit: TimeUnit): ScheduledFuture[_] = {
+    info(s"Scheduling runnable $runnable with initial delay: $initialDelay, 
fixed delay: $delay")
+    scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, 
timeUnit)
+  }
+
+  def shutdown(): Boolean = {
+    info("Shutting down scheduled thread pool")
+    scheduledThreadPool.shutdownNow()
+    //waits for 2 mins to terminate the current tasks

Review Comment:
   This comment is not needed.



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -17,39 +17,107 @@
 package kafka.log.remote
 
 import kafka.cluster.Partition
+import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
-import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
-import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, 
Time, Utils}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer
+import 
org.apache.kafka.server.log.remote.metadata.storage.{ClassLoaderAwareRemoteLogMetadataManager,
 TopicBasedRemoteLogMetadataManagerConfig}
 import org.apache.kafka.server.log.remote.storage._
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, 
LeaderEpochCheckpointFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
+import org.apache.kafka.storage.internals.log.EpochEntry
 
-import java.io.{Closeable, InputStream}
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
 import java.security.{AccessController, PrivilegedAction}
-import java.util
 import java.util.Optional
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.{lang, util}
+import scala.collection.Searching._
 import scala.collection.Set
 import scala.jdk.CollectionConverters._
 
+class RLMScheduledThreadPool(poolSize: Int) extends Logging {
+
+  private val scheduledThreadPool: ScheduledThreadPoolExecutor = {
+    val threadPool = new ScheduledThreadPoolExecutor(poolSize)
+    threadPool.setRemoveOnCancelPolicy(true)
+    threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+    threadPool.setThreadFactory(new ThreadFactory {
+      private val sequence = new AtomicInteger()
+
+      override def newThread(r: Runnable): Thread = {
+        KafkaThread.daemon("kafka-rlm-thread-pool-" + 
sequence.incrementAndGet(), r)
+      }
+    })
+
+    threadPool
+  }
+
+  def resizePool(size: Int): Unit = {
+    info(s"Resizing pool from ${scheduledThreadPool.getCorePoolSize} to $size")
+    scheduledThreadPool.setCorePoolSize(size)
+  }
+
+  def poolSize(): Int = scheduledThreadPool.getCorePoolSize
+
+  def getIdlePercent(): Double = {
+    1 - scheduledThreadPool.getActiveCount().asInstanceOf[Double] / 
scheduledThreadPool.getCorePoolSize.asInstanceOf[Double]
+  }
+
+  def scheduleWithFixedDelay(runnable: Runnable, initialDelay: Long, delay: 
Long,
+                             timeUnit: TimeUnit): ScheduledFuture[_] = {
+    info(s"Scheduling runnable $runnable with initial delay: $initialDelay, 
fixed delay: $delay")
+    scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, 
timeUnit)
+  }
+
+  def shutdown(): Boolean = {
+    info("Shutting down scheduled thread pool")
+    scheduledThreadPool.shutdownNow()
+    //waits for 2 mins to terminate the current tasks
+    scheduledThreadPool.awaitTermination(2, TimeUnit.MINUTES)
+  }
+}
+
+trait CancellableRunnable extends Runnable {
+  @volatile private var cancelled = false
+
+  def cancel(): Unit = {
+    cancelled = true
+  }
+
+  def isCancelled(): Boolean = {
+    cancelled
+  }
+}
+
 /**
  * This class is responsible for
- *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances.
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances
  *  - receives any leader and follower replica events and partition stop 
events and act on them
- *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *  - also provides APIs to fetch indexes, metadata about remote log segments
+ *  - copying log segments to remote storage
  *
  * @param rlmConfig Configuration required for remote logging subsystem(tiered 
storage) at the broker level.
  * @param brokerId  id of the current broker.
  * @param logDir    directory of Kafka log segments.

Review Comment:
   Please remember to update the javadoc



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -520,6 +522,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       localLog.updateRecoveryPoint(offset)
     }
   }
+  def updateHighestOffsetInRemoteStorage(offset: Long): Unit = {
+    if (!remoteLogEnabled())
+      warn(s"Received update for highest offset with remote index as: $offset, 
the existing value: $highestOffsetInRemoteStorage")

Review Comment:
   It's unclear what we tried to warn? Maybe something like this:
   `warn(s"Unable to update the highest offset with remote index as: $offset, 
the existing value: $highestOffsetInRemoteStorage since remote storage is not 
enabled.")`
   



##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -272,15 +310,326 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
     None
   }
 
+  trait CancellableRunnable extends Runnable {

Review Comment:
   Why should we declare `CancellableRunnable` twice in this class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to