[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-10 Thread via GitHub


showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1161677863


##
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import kafka.cluster.Partition;
+import kafka.log.LogSegment;
+import kafka.log.UnifiedLog;
+import kafka.server.KafkaConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
+import org.apache.kafka.common.record.RemoteLogInputStream;
+import org.apache.kafka.common.utils.ChildFirstClassLoader;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager;
+import 
org.apache.kafka.server.log.remote.storage.ClassLoaderAwareRemoteStorageManager;
+import org.apache.kafka.server.log.remote.storage.LogSegmentData;
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpoint;
+import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
+import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This class is responsible for
+ * - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` 
instances
+ * - receives any leader and follower replica events and partition stop events 
and act on them
+ * - also provides APIs to fetch indexes, metadata about remote log segments
+ * - copying log segments to remote storage
+ */
+public class RemoteLogManager implements Closeable {
+
+private static final Logger LOGGER = 

[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-05 Thread via GitHub


showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1158212079


##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -272,15 +364,270 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
 None
   }
 
+  trait CancellableRunnable extends Runnable {
+@volatile private var cancelled = false
+
+def cancel(): Unit = {
+  cancelled = true
+}
+
+def isCancelled(): Boolean = {
+  cancelled
+}
+  }
+
+  /**
+   * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] and end[inclusive] offset
+   *
+   * @param log The actual log from where to take the leader-epoch 
checkpoint
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {
+val checkpoint = new InMemoryLeaderEpochCheckpoint()
+log.leaderEpochCache
+  .map(cache => cache.writeTo(checkpoint))
+  .foreach { x =>
+if (startOffset >= 0) {
+  x.truncateFromStart(startOffset)
+}
+x.truncateFromEnd(endOffset)
+  }
+checkpoint
+  }
+
+  private[remote] class RLMTask(tpId: TopicIdPartition) extends 
CancellableRunnable with Logging {
+this.logIdent = s"[RemoteLogManager=$brokerId partition=$tpId] "
+@volatile private var leaderEpoch: Int = -1
+
+private def isLeader(): Boolean = leaderEpoch >= 0
+
+// The readOffset is None initially for a new leader RLMTask, and needs to 
be fetched inside the task's run() method.
+@volatile private var copiedOffsetOption: Option[Long] = None
+
+def convertToLeader(leaderEpochVal: Int): Unit = {
+  if (leaderEpochVal < 0) {
+throw new KafkaException(s"leaderEpoch value for topic partition $tpId 
can not be negative")
+  }
+  if (this.leaderEpoch != leaderEpochVal) {
+leaderEpoch = leaderEpochVal
+  }
+  // Reset readOffset, so that it is set in next run of RLMTask
+  copiedOffsetOption = None
+}
+
+def convertToFollower(): Unit = {
+  leaderEpoch = -1
+}
+
+def handleCopyLogSegmentsToRemote(): Unit = {
+  if (isCancelled())
+return
+
+  def maybeUpdateReadOffset(): Unit = {
+if (copiedOffsetOption.isEmpty) {
+  info(s"Find the highest remote offset for partition: $tpId after 
becoming leader, leaderEpoch: $leaderEpoch")
+
+  // This is found by traversing from the latest leader epoch from 
leader epoch history and find the highest offset
+  // of a segment with that epoch copied into remote storage. If it 
can not find an entry then it checks for the
+  // previous leader epoch till it finds an entry, If there are no 
entries till the earliest leader epoch in leader
+  // epoch cache then it starts copying the segments from the earliest 
epoch entry’s offset.
+  copiedOffsetOption = Some(findHighestRemoteOffset(tpId))
+}
+  }
+
+  try {
+maybeUpdateReadOffset()
+val copiedOffset = copiedOffsetOption.get
+fetchLog(tpId.topicPartition()).foreach { log =>
+  // LSO indicates the offset below are ready to be 
consumed(high-watermark or committed)
+  val lso = log.lastStableOffset
+  if (lso < 0) {
+warn(s"lastStableOffset for partition $tpId is $lso, which should 
not be negative.")
+  } else if (lso > 0 && copiedOffset < lso) {
+// Copy segments only till the min of high-watermark or 
stable-offset as remote storage should contain
+// only committed/acked messages
+val toOffset = lso
+debug(s"Checking for segments to copy, copiedOffset: $copiedOffset 
and toOffset: $toOffset")
+val activeSegBaseOffset = log.activeSegment.baseOffset
+// log-start-offset can be ahead of the read-offset, when:
+// 1) log-start-offset gets incremented via delete-records API (or)
+// 2) enabling the remote log for the first time
+val fromOffset = Math.max(copiedOffset + 1, log.logStartOffset)
+val sortedSegments = log.logSegments(fromOffset, 
toOffset).toSeq.sortBy(_.baseOffset)
+val activeSegIndex: Int = sortedSegments.map(x => 
x.baseOffset).search(activeSegBaseOffset) match {
+  case Found(x) => x
+  case InsertionPoint(y) => y - 1
+}
+// sortedSegments becomes empty list when fromOffset and toOffset 
are 

[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-04 Thread via GitHub


showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1157196981


##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -272,15 +364,270 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
 None
   }
 
+  trait CancellableRunnable extends Runnable {
+@volatile private var cancelled = false
+
+def cancel(): Unit = {
+  cancelled = true
+}
+
+def isCancelled(): Boolean = {
+  cancelled
+}
+  }
+
+  /**
+   * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] and end[inclusive] offset
+   *
+   * @param log The actual log from where to take the leader-epoch 
checkpoint
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {
+val checkpoint = new InMemoryLeaderEpochCheckpoint()
+log.leaderEpochCache
+  .map(cache => cache.writeTo(checkpoint))
+  .foreach { x =>
+if (startOffset >= 0) {
+  x.truncateFromStart(startOffset)
+}
+x.truncateFromEnd(endOffset)
+  }
+checkpoint
+  }
+
+  private[remote] class RLMTask(tpId: TopicIdPartition) extends 
CancellableRunnable with Logging {
+this.logIdent = s"[RemoteLogManager=$brokerId partition=$tpId] "
+@volatile private var leaderEpoch: Int = -1
+
+private def isLeader(): Boolean = leaderEpoch >= 0
+
+// The readOffset is None initially for a new leader RLMTask, and needs to 
be fetched inside the task's run() method.
+@volatile private var copiedOffsetOption: Option[Long] = None
+
+def convertToLeader(leaderEpochVal: Int): Unit = {
+  if (leaderEpochVal < 0) {
+throw new KafkaException(s"leaderEpoch value for topic partition $tpId 
can not be negative")
+  }
+  if (this.leaderEpoch != leaderEpochVal) {
+leaderEpoch = leaderEpochVal
+  }
+  // Reset readOffset, so that it is set in next run of RLMTask
+  copiedOffsetOption = None
+}
+
+def convertToFollower(): Unit = {
+  leaderEpoch = -1
+}
+
+def handleCopyLogSegmentsToRemote(): Unit = {
+  if (isCancelled())
+return
+
+  def maybeUpdateReadOffset(): Unit = {
+if (copiedOffsetOption.isEmpty) {
+  info(s"Find the highest remote offset for partition: $tpId after 
becoming leader, leaderEpoch: $leaderEpoch")
+
+  // This is found by traversing from the latest leader epoch from 
leader epoch history and find the highest offset
+  // of a segment with that epoch copied into remote storage. If it 
can not find an entry then it checks for the
+  // previous leader epoch till it finds an entry, If there are no 
entries till the earliest leader epoch in leader
+  // epoch cache then it starts copying the segments from the earliest 
epoch entry’s offset.
+  copiedOffsetOption = Some(findHighestRemoteOffset(tpId))
+}
+  }
+
+  try {
+maybeUpdateReadOffset()
+val copiedOffset = copiedOffsetOption.get
+fetchLog(tpId.topicPartition()).foreach { log =>
+  // LSO indicates the offset below are ready to be 
consumed(high-watermark or committed)
+  val lso = log.lastStableOffset
+  if (lso < 0) {
+warn(s"lastStableOffset for partition $tpId is $lso, which should 
not be negative.")
+  } else if (lso > 0 && copiedOffset < lso) {
+// Copy segments only till the min of high-watermark or 
stable-offset as remote storage should contain
+// only committed/acked messages
+val toOffset = lso
+debug(s"Checking for segments to copy, copiedOffset: $copiedOffset 
and toOffset: $toOffset")
+val activeSegBaseOffset = log.activeSegment.baseOffset
+// log-start-offset can be ahead of the read-offset, when:
+// 1) log-start-offset gets incremented via delete-records API (or)
+// 2) enabling the remote log for the first time
+val fromOffset = Math.max(copiedOffset + 1, log.logStartOffset)
+val sortedSegments = log.logSegments(fromOffset, 
toOffset).toSeq.sortBy(_.baseOffset)
+val activeSegIndex: Int = sortedSegments.map(x => 
x.baseOffset).search(activeSegBaseOffset) match {
+  case Found(x) => x
+  case InsertionPoint(y) => y - 1
+}
+// sortedSegments becomes empty list when fromOffset and toOffset 
are 

[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-04 Thread via GitHub


showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1157152232


##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -143,24 +216,44 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
  topicIds: util.Map[String, Uuid]): Unit = {
 debug(s"Received leadership changes for leaders: $partitionsBecomeLeader 
and followers: $partitionsBecomeFollower")
 
-// Partitions logs are available when this callback is invoked.
-// Compact topics and internal topics are filtered here as they are not 
supported with tiered storage.
-def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+def filterPartitions(partitions: Set[Partition]): Set[Partition] = {
   // We are not specifically checking for internal topics etc here as 
`log.remoteLogEnabled()` already handles that.
   partitions.filter(partition => partition.log.exists(log => 
log.remoteLogEnabled()))
-.map(partition => new TopicIdPartition(topicIds.get(partition.topic), 
partition.topicPartition))
 }
 
-val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
-val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
-debug(s"Effective topic partitions after filtering compact and internal 
topics, leaders: $leaderTopicPartitions " +
-  s"and followers: $followerTopicPartitions")
+val leaderPartitionsWithLeaderEpoch = 
filterPartitions(partitionsBecomeLeader)
+  .map(p => new TopicIdPartition(topicIds.get(p.topic), p.topicPartition) 
-> p.getLeaderEpoch).toMap
+val leaderPartitions = leaderPartitionsWithLeaderEpoch.keySet
 
-if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
-  leaderTopicPartitions.foreach(x => 
topicPartitionIds.put(x.topicPartition(), x.topicId()))
-  followerTopicPartitions.foreach(x => 
topicPartitionIds.put(x.topicPartition(), x.topicId()))
+val followerPartitions = filterPartitions(partitionsBecomeFollower)
+  .map(p => new TopicIdPartition(topicIds.get(p.topic), p.topicPartition))
+
+def cacheTopicPartitionIds(topicIdPartition: TopicIdPartition): Unit = {
+  val previousTopicId = 
topicPartitionIds.put(topicIdPartition.topicPartition(), 
topicIdPartition.topicId())
+  if (previousTopicId != null && previousTopicId != 
topicIdPartition.topicId()) {
+warn(s"Previous cached topic id $previousTopicId for 
${topicIdPartition.topicPartition()} does " +

Review Comment:
   This warning message seems not necessary IMO. Users cannot do anything when 
getting this info, right?



##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -272,15 +364,270 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
 None
   }
 
+  trait CancellableRunnable extends Runnable {
+@volatile private var cancelled = false
+
+def cancel(): Unit = {
+  cancelled = true
+}
+
+def isCancelled(): Boolean = {
+  cancelled
+}
+  }
+
+  /**
+   * Returns the leader epoch checkpoint by truncating with the given 
start[exclusive] and end[inclusive] offset
+   *
+   * @param log The actual log from where to take the leader-epoch 
checkpoint
+   * @param startOffset The start offset of the checkpoint file (exclusive in 
the truncation).
+   *If start offset is 6, then it will retain an entry at 
offset 6.
+   * @param endOffset   The end offset of the checkpoint file (inclusive in 
the truncation)
+   *If end offset is 100, then it will remove the entries 
greater than or equal to 100.
+   * @return the truncated leader epoch checkpoint
+   */
+  private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset: 
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {
+val checkpoint = new InMemoryLeaderEpochCheckpoint()
+log.leaderEpochCache
+  .map(cache => cache.writeTo(checkpoint))
+  .foreach { x =>
+if (startOffset >= 0) {
+  x.truncateFromStart(startOffset)
+}
+x.truncateFromEnd(endOffset)
+  }
+checkpoint
+  }
+
+  private[remote] class RLMTask(tpId: TopicIdPartition) extends 
CancellableRunnable with Logging {
+this.logIdent = s"[RemoteLogManager=$brokerId partition=$tpId] "
+@volatile private var leaderEpoch: Int = -1
+
+private def isLeader(): Boolean = leaderEpoch >= 0
+
+// The readOffset is None initially for a new leader RLMTask, and needs to 
be fetched inside the task's run() method.
+@volatile private var copiedOffsetOption: Option[Long] = None
+
+def convertToLeader(leaderEpochVal: Int): Unit = {
+  if (leaderEpochVal < 0) {
+throw new KafkaException(s"leaderEpoch value for topic partition $tpId 
can not be negative")
+  }
+  if (this.leaderEpoch != leaderEpochVal) {
+leaderEpoch = leaderEpochVal
+  }
+  // Reset readOffset, so that 

[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-04 Thread via GitHub


showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1156860415


##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -17,39 +17,107 @@
 package kafka.log.remote
 
 import kafka.cluster.Partition
+import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
-import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
-import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, 
Time, Utils}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer
+import 
org.apache.kafka.server.log.remote.metadata.storage.{ClassLoaderAwareRemoteLogMetadataManager,
 TopicBasedRemoteLogMetadataManagerConfig}
 import org.apache.kafka.server.log.remote.storage._
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, 
LeaderEpochCheckpointFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
+import org.apache.kafka.storage.internals.log.EpochEntry
 
-import java.io.{Closeable, InputStream}
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
 import java.security.{AccessController, PrivilegedAction}
-import java.util
 import java.util.Optional
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.{lang, util}
+import scala.collection.Searching._
 import scala.collection.Set
 import scala.jdk.CollectionConverters._
 
+class RLMScheduledThreadPool(poolSize: Int) extends Logging {
+
+  private val scheduledThreadPool: ScheduledThreadPoolExecutor = {
+val threadPool = new ScheduledThreadPoolExecutor(poolSize)
+threadPool.setRemoveOnCancelPolicy(true)
+threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)

Review Comment:
   should we set `setContinueExistingPeriodicTasksAfterShutdownPolicy(false);` ?



##
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##
@@ -17,39 +17,107 @@
 package kafka.log.remote
 
 import kafka.cluster.Partition
+import kafka.log.UnifiedLog
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.common._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
-import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
-import 
org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, KafkaThread, 
Time, Utils}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointWriteBuffer
+import 
org.apache.kafka.server.log.remote.metadata.storage.{ClassLoaderAwareRemoteLogMetadataManager,
 TopicBasedRemoteLogMetadataManagerConfig}
 import org.apache.kafka.server.log.remote.storage._
+import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpoint, 
LeaderEpochCheckpointFile}
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
+import org.apache.kafka.storage.internals.log.EpochEntry
 
-import java.io.{Closeable, InputStream}
+import java.io._
+import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
+import java.nio.file.Path
 import java.security.{AccessController, PrivilegedAction}
-import java.util
 import java.util.Optional
-import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicInteger
+import java.{lang, util}
+import scala.collection.Searching._
 import scala.collection.Set
 import scala.jdk.CollectionConverters._
 
+class RLMScheduledThreadPool(poolSize: Int) extends Logging {
+
+  private val scheduledThreadPool: ScheduledThreadPoolExecutor = {
+val threadPool = new ScheduledThreadPoolExecutor(poolSize)
+threadPool.setRemoveOnCancelPolicy(true)
+threadPool.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+threadPool.setThreadFactory(new ThreadFactory {
+  private val sequence = new AtomicInteger()
+
+  override def newThread(r: Runnable): Thread = {
+KafkaThread.daemon("kafka-rlm-thread-pool-" + 
sequence.incrementAndGet(), r)
+  }
+})
+
+threadPool
+  }
+
+  def resizePool(size: Int): Unit = {
+info(s"Resizing pool from ${scheduledThreadPool.getCorePoolSize} to $size")
+scheduledThreadPool.setCorePoolSize(size)
+  }
+
+  def poolSize(): Int = scheduledThreadPool.getCorePoolSize
+
+  def getIdlePercent(): Double = {
+  

[GitHub] [kafka] showuon commented on a diff in pull request #13487: KAFKA-9550 Copying log segments to tiered storage in RemoteLogManager

2023-04-03 Thread via GitHub


showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1155880909


##
storage/src/main/java/org/apache/kafka/storage/internals/epoch/LeaderEpochFileCache.java:
##
@@ -359,6 +361,27 @@ public OptionalInt epochForOffset(long offset) {
 }
 }
 
+public LeaderEpochFileCache writeTo(LeaderEpochCheckpoint 
leaderEpochCheckpoint) {
+lock.readLock().lock();
+try {
+leaderEpochCheckpoint.write(epochEntries());
+return new LeaderEpochFileCache(topicPartition, 
leaderEpochCheckpoint);
+} finally {
+lock.readLock().unlock();
+}
+}
+
+public OptionalInt findPreviousEpoch(int epoch) {

Review Comment:
   Any reason we don't use the existing `previousEpoch` method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org