showuon commented on code in PR #13487:
URL: https://github.com/apache/kafka/pull/13487#discussion_r1157152232
##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -143,24 +216,44 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
topicIds: util.Map[String, Uuid]): Unit = {
debug(s"Received leadership changes for leaders: $partitionsBecomeLeader
and followers: $partitionsBecomeFollower")
- // Partitions logs are available when this callback is invoked.
- // Compact topics and internal topics are filtered here as they are not
supported with tiered storage.
- def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+ def filterPartitions(partitions: Set[Partition]): Set[Partition] = {
// We are not specifically checking for internal topics etc here as
`log.remoteLogEnabled()` already handles that.
partitions.filter(partition => partition.log.exists(log =>
log.remoteLogEnabled()))
- .map(partition => new TopicIdPartition(topicIds.get(partition.topic),
partition.topicPartition))
}
- val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
- val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
- debug(s"Effective topic partitions after filtering compact and internal
topics, leaders: $leaderTopicPartitions " +
- s"and followers: $followerTopicPartitions")
+ val leaderPartitionsWithLeaderEpoch =
filterPartitions(partitionsBecomeLeader)
+ .map(p => new TopicIdPartition(topicIds.get(p.topic), p.topicPartition)
-> p.getLeaderEpoch).toMap
+ val leaderPartitions = leaderPartitionsWithLeaderEpoch.keySet
- if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
- leaderTopicPartitions.foreach(x =>
topicPartitionIds.put(x.topicPartition(), x.topicId()))
- followerTopicPartitions.foreach(x =>
topicPartitionIds.put(x.topicPartition(), x.topicId()))
+ val followerPartitions = filterPartitions(partitionsBecomeFollower)
+ .map(p => new TopicIdPartition(topicIds.get(p.topic), p.topicPartition))
+
+ def cacheTopicPartitionIds(topicIdPartition: TopicIdPartition): Unit = {
+ val previousTopicId =
topicPartitionIds.put(topicIdPartition.topicPartition(),
topicIdPartition.topicId())
+ if (previousTopicId != null && previousTopicId !=
topicIdPartition.topicId()) {
+ warn(s"Previous cached topic id $previousTopicId for
${topicIdPartition.topicPartition()} does " +
Review Comment:
This warning message seems not necessary IMO. Users cannot do anything when
getting this info, right?
##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -272,15 +364,270 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
None
}
+ trait CancellableRunnable extends Runnable {
+ @volatile private var cancelled = false
+
+ def cancel(): Unit = {
+ cancelled = true
+ }
+
+ def isCancelled(): Boolean = {
+ cancelled
+ }
+ }
+
+ /**
+ * Returns the leader epoch checkpoint by truncating with the given
start[exclusive] and end[inclusive] offset
+ *
+ * @param log The actual log from where to take the leader-epoch
checkpoint
+ * @param startOffset The start offset of the checkpoint file (exclusive in
the truncation).
+ * If start offset is 6, then it will retain an entry at
offset 6.
+ * @param endOffset The end offset of the checkpoint file (inclusive in
the truncation)
+ * If end offset is 100, then it will remove the entries
greater than or equal to 100.
+ * @return the truncated leader epoch checkpoint
+ */
+ private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset:
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {
+ val checkpoint = new InMemoryLeaderEpochCheckpoint()
+ log.leaderEpochCache
+ .map(cache => cache.writeTo(checkpoint))
+ .foreach { x =>
+ if (startOffset >= 0) {
+ x.truncateFromStart(startOffset)
+ }
+ x.truncateFromEnd(endOffset)
+ }
+ checkpoint
+ }
+
+ private[remote] class RLMTask(tpId: TopicIdPartition) extends
CancellableRunnable with Logging {
+ this.logIdent = s"[RemoteLogManager=$brokerId partition=$tpId] "
+ @volatile private var leaderEpoch: Int = -1
+
+ private def isLeader(): Boolean = leaderEpoch >= 0
+
+ // The readOffset is None initially for a new leader RLMTask, and needs to
be fetched inside the task's run() method.
+ @volatile private var copiedOffsetOption: Option[Long] = None
+
+ def convertToLeader(leaderEpochVal: Int): Unit = {
+ if (leaderEpochVal < 0) {
+ throw new KafkaException(s"leaderEpoch value for topic partition $tpId
can not be negative")
+ }
+ if (this.leaderEpoch != leaderEpochVal) {
+ leaderEpoch = leaderEpochVal
+ }
+ // Reset readOffset, so that it is set in next run of RLMTask
+ copiedOffsetOption = None
+ }
+
+ def convertToFollower(): Unit = {
+ leaderEpoch = -1
+ }
+
+ def handleCopyLogSegmentsToRemote(): Unit = {
+ if (isCancelled())
+ return
+
+ def maybeUpdateReadOffset(): Unit = {
+ if (copiedOffsetOption.isEmpty) {
+ info(s"Find the highest remote offset for partition: $tpId after
becoming leader, leaderEpoch: $leaderEpoch")
+
+ // This is found by traversing from the latest leader epoch from
leader epoch history and find the highest offset
+ // of a segment with that epoch copied into remote storage. If it
can not find an entry then it checks for the
+ // previous leader epoch till it finds an entry, If there are no
entries till the earliest leader epoch in leader
+ // epoch cache then it starts copying the segments from the earliest
epoch entry’s offset.
+ copiedOffsetOption = Some(findHighestRemoteOffset(tpId))
+ }
+ }
+
+ try {
+ maybeUpdateReadOffset()
+ val copiedOffset = copiedOffsetOption.get
+ fetchLog(tpId.topicPartition()).foreach { log =>
+ // LSO indicates the offset below are ready to be
consumed(high-watermark or committed)
+ val lso = log.lastStableOffset
+ if (lso < 0) {
+ warn(s"lastStableOffset for partition $tpId is $lso, which should
not be negative.")
+ } else if (lso > 0 && copiedOffset < lso) {
+ // Copy segments only till the min of high-watermark or
stable-offset as remote storage should contain
+ // only committed/acked messages
+ val toOffset = lso
Review Comment:
Where do we set the offset with `min of high-watermark or stable-offset`?
##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -272,15 +364,270 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
None
}
+ trait CancellableRunnable extends Runnable {
+ @volatile private var cancelled = false
+
+ def cancel(): Unit = {
+ cancelled = true
+ }
+
+ def isCancelled(): Boolean = {
+ cancelled
+ }
+ }
+
+ /**
+ * Returns the leader epoch checkpoint by truncating with the given
start[exclusive] and end[inclusive] offset
+ *
+ * @param log The actual log from where to take the leader-epoch
checkpoint
+ * @param startOffset The start offset of the checkpoint file (exclusive in
the truncation).
+ * If start offset is 6, then it will retain an entry at
offset 6.
+ * @param endOffset The end offset of the checkpoint file (inclusive in
the truncation)
+ * If end offset is 100, then it will remove the entries
greater than or equal to 100.
+ * @return the truncated leader epoch checkpoint
+ */
+ private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset:
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {
+ val checkpoint = new InMemoryLeaderEpochCheckpoint()
+ log.leaderEpochCache
+ .map(cache => cache.writeTo(checkpoint))
+ .foreach { x =>
+ if (startOffset >= 0) {
+ x.truncateFromStart(startOffset)
+ }
+ x.truncateFromEnd(endOffset)
+ }
+ checkpoint
+ }
+
+ private[remote] class RLMTask(tpId: TopicIdPartition) extends
CancellableRunnable with Logging {
+ this.logIdent = s"[RemoteLogManager=$brokerId partition=$tpId] "
+ @volatile private var leaderEpoch: Int = -1
+
+ private def isLeader(): Boolean = leaderEpoch >= 0
+
+ // The readOffset is None initially for a new leader RLMTask, and needs to
be fetched inside the task's run() method.
+ @volatile private var copiedOffsetOption: Option[Long] = None
+
+ def convertToLeader(leaderEpochVal: Int): Unit = {
+ if (leaderEpochVal < 0) {
+ throw new KafkaException(s"leaderEpoch value for topic partition $tpId
can not be negative")
+ }
+ if (this.leaderEpoch != leaderEpochVal) {
+ leaderEpoch = leaderEpochVal
+ }
+ // Reset readOffset, so that it is set in next run of RLMTask
+ copiedOffsetOption = None
+ }
+
+ def convertToFollower(): Unit = {
+ leaderEpoch = -1
+ }
+
+ def handleCopyLogSegmentsToRemote(): Unit = {
+ if (isCancelled())
+ return
+
+ def maybeUpdateReadOffset(): Unit = {
+ if (copiedOffsetOption.isEmpty) {
+ info(s"Find the highest remote offset for partition: $tpId after
becoming leader, leaderEpoch: $leaderEpoch")
+
+ // This is found by traversing from the latest leader epoch from
leader epoch history and find the highest offset
+ // of a segment with that epoch copied into remote storage. If it
can not find an entry then it checks for the
+ // previous leader epoch till it finds an entry, If there are no
entries till the earliest leader epoch in leader
+ // epoch cache then it starts copying the segments from the earliest
epoch entry’s offset.
+ copiedOffsetOption = Some(findHighestRemoteOffset(tpId))
+ }
+ }
+
+ try {
+ maybeUpdateReadOffset()
+ val copiedOffset = copiedOffsetOption.get
+ fetchLog(tpId.topicPartition()).foreach { log =>
+ // LSO indicates the offset below are ready to be
consumed(high-watermark or committed)
+ val lso = log.lastStableOffset
+ if (lso < 0) {
+ warn(s"lastStableOffset for partition $tpId is $lso, which should
not be negative.")
+ } else if (lso > 0 && copiedOffset < lso) {
+ // Copy segments only till the min of high-watermark or
stable-offset as remote storage should contain
+ // only committed/acked messages
+ val toOffset = lso
+ debug(s"Checking for segments to copy, copiedOffset: $copiedOffset
and toOffset: $toOffset")
+ val activeSegBaseOffset = log.activeSegment.baseOffset
+ // log-start-offset can be ahead of the read-offset, when:
+ // 1) log-start-offset gets incremented via delete-records API (or)
+ // 2) enabling the remote log for the first time
+ val fromOffset = Math.max(copiedOffset + 1, log.logStartOffset)
+ val sortedSegments = log.logSegments(fromOffset,
toOffset).toSeq.sortBy(_.baseOffset)
+ val activeSegIndex: Int = sortedSegments.map(x =>
x.baseOffset).search(activeSegBaseOffset) match {
+ case Found(x) => x
+ case InsertionPoint(y) => y - 1
+ }
+ // sortedSegments becomes empty list when fromOffset and toOffset
are same, and activeSegIndex becomes -1
+ if (activeSegIndex < 0) {
+ debug(s"No segments found to be copied for partition $tpId with
copiedOffset: $copiedOffset and " +
+ s"active segment's base-offset: $activeSegBaseOffset")
+ } else {
+ sortedSegments.slice(0, activeSegIndex).foreach { segment =>
+ if (isCancelled() || !isLeader()) {
+ info(s"Skipping copying log segments as the current task
state is changed, cancelled: " +
+ s"${isCancelled()} leader:${isLeader()}")
+ return
+ }
+
+ val logFile = segment.log.file()
+ val logFileName = logFile.getName
+
+ info(s"Copying $logFileName to remote storage.")
+ val id = new RemoteLogSegmentId(tpId, Uuid.randomUuid())
+
+ val nextOffset = segment.readNextOffset
+ val endOffset = nextOffset - 1
+ val producerIdSnapshotFile: File =
log.producerStateManager.fetchSnapshot(nextOffset).orElse(null)
+
+ val epochEntries = getLeaderEpochCheckpoint(log,
segment.baseOffset, nextOffset).read()
+ val segmentLeaderEpochs: util.Map[Integer, lang.Long] = new
util.HashMap[Integer, lang.Long](epochEntries.size())
Review Comment:
Should we use `lang.Integer` here?
##########
core/src/main/scala/kafka/log/remote/RemoteLogManager.scala:
##########
@@ -272,15 +364,270 @@ class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
None
}
+ trait CancellableRunnable extends Runnable {
+ @volatile private var cancelled = false
+
+ def cancel(): Unit = {
+ cancelled = true
+ }
+
+ def isCancelled(): Boolean = {
+ cancelled
+ }
+ }
+
+ /**
+ * Returns the leader epoch checkpoint by truncating with the given
start[exclusive] and end[inclusive] offset
+ *
+ * @param log The actual log from where to take the leader-epoch
checkpoint
+ * @param startOffset The start offset of the checkpoint file (exclusive in
the truncation).
+ * If start offset is 6, then it will retain an entry at
offset 6.
+ * @param endOffset The end offset of the checkpoint file (inclusive in
the truncation)
+ * If end offset is 100, then it will remove the entries
greater than or equal to 100.
+ * @return the truncated leader epoch checkpoint
+ */
+ private[remote] def getLeaderEpochCheckpoint(log: UnifiedLog, startOffset:
Long, endOffset: Long): InMemoryLeaderEpochCheckpoint = {
+ val checkpoint = new InMemoryLeaderEpochCheckpoint()
+ log.leaderEpochCache
+ .map(cache => cache.writeTo(checkpoint))
+ .foreach { x =>
+ if (startOffset >= 0) {
+ x.truncateFromStart(startOffset)
+ }
+ x.truncateFromEnd(endOffset)
+ }
+ checkpoint
+ }
+
+ private[remote] class RLMTask(tpId: TopicIdPartition) extends
CancellableRunnable with Logging {
+ this.logIdent = s"[RemoteLogManager=$brokerId partition=$tpId] "
+ @volatile private var leaderEpoch: Int = -1
+
+ private def isLeader(): Boolean = leaderEpoch >= 0
+
+ // The readOffset is None initially for a new leader RLMTask, and needs to
be fetched inside the task's run() method.
+ @volatile private var copiedOffsetOption: Option[Long] = None
+
+ def convertToLeader(leaderEpochVal: Int): Unit = {
+ if (leaderEpochVal < 0) {
+ throw new KafkaException(s"leaderEpoch value for topic partition $tpId
can not be negative")
+ }
+ if (this.leaderEpoch != leaderEpochVal) {
+ leaderEpoch = leaderEpochVal
+ }
+ // Reset readOffset, so that it is set in next run of RLMTask
+ copiedOffsetOption = None
+ }
+
+ def convertToFollower(): Unit = {
+ leaderEpoch = -1
+ }
+
+ def handleCopyLogSegmentsToRemote(): Unit = {
+ if (isCancelled())
+ return
+
+ def maybeUpdateReadOffset(): Unit = {
+ if (copiedOffsetOption.isEmpty) {
+ info(s"Find the highest remote offset for partition: $tpId after
becoming leader, leaderEpoch: $leaderEpoch")
+
+ // This is found by traversing from the latest leader epoch from
leader epoch history and find the highest offset
+ // of a segment with that epoch copied into remote storage. If it
can not find an entry then it checks for the
+ // previous leader epoch till it finds an entry, If there are no
entries till the earliest leader epoch in leader
+ // epoch cache then it starts copying the segments from the earliest
epoch entry’s offset.
+ copiedOffsetOption = Some(findHighestRemoteOffset(tpId))
+ }
+ }
+
+ try {
+ maybeUpdateReadOffset()
+ val copiedOffset = copiedOffsetOption.get
+ fetchLog(tpId.topicPartition()).foreach { log =>
+ // LSO indicates the offset below are ready to be
consumed(high-watermark or committed)
+ val lso = log.lastStableOffset
+ if (lso < 0) {
+ warn(s"lastStableOffset for partition $tpId is $lso, which should
not be negative.")
+ } else if (lso > 0 && copiedOffset < lso) {
+ // Copy segments only till the min of high-watermark or
stable-offset as remote storage should contain
+ // only committed/acked messages
+ val toOffset = lso
+ debug(s"Checking for segments to copy, copiedOffset: $copiedOffset
and toOffset: $toOffset")
+ val activeSegBaseOffset = log.activeSegment.baseOffset
+ // log-start-offset can be ahead of the read-offset, when:
+ // 1) log-start-offset gets incremented via delete-records API (or)
+ // 2) enabling the remote log for the first time
+ val fromOffset = Math.max(copiedOffset + 1, log.logStartOffset)
+ val sortedSegments = log.logSegments(fromOffset,
toOffset).toSeq.sortBy(_.baseOffset)
+ val activeSegIndex: Int = sortedSegments.map(x =>
x.baseOffset).search(activeSegBaseOffset) match {
+ case Found(x) => x
+ case InsertionPoint(y) => y - 1
+ }
+ // sortedSegments becomes empty list when fromOffset and toOffset
are same, and activeSegIndex becomes -1
+ if (activeSegIndex < 0) {
+ debug(s"No segments found to be copied for partition $tpId with
copiedOffset: $copiedOffset and " +
+ s"active segment's base-offset: $activeSegBaseOffset")
+ } else {
+ sortedSegments.slice(0, activeSegIndex).foreach { segment =>
+ if (isCancelled() || !isLeader()) {
+ info(s"Skipping copying log segments as the current task
state is changed, cancelled: " +
+ s"${isCancelled()} leader:${isLeader()}")
+ return
+ }
+
+ val logFile = segment.log.file()
+ val logFileName = logFile.getName
+
+ info(s"Copying $logFileName to remote storage.")
+ val id = new RemoteLogSegmentId(tpId, Uuid.randomUuid())
+
+ val nextOffset = segment.readNextOffset
+ val endOffset = nextOffset - 1
+ val producerIdSnapshotFile: File =
log.producerStateManager.fetchSnapshot(nextOffset).orElse(null)
+
+ val epochEntries = getLeaderEpochCheckpoint(log,
segment.baseOffset, nextOffset).read()
+ val segmentLeaderEpochs: util.Map[Integer, lang.Long] = new
util.HashMap[Integer, lang.Long](epochEntries.size())
+ epochEntries.forEach(entry =>
+ segmentLeaderEpochs.put(Integer.valueOf(entry.epoch),
lang.Long.valueOf(entry.startOffset))
+ )
+
+ val remoteLogSegmentMetadata = new
RemoteLogSegmentMetadata(id, segment.baseOffset, endOffset,
Review Comment:
could we rename this variable to a meaningful name like
`copySegmentStartedMetadata`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]