kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602052452



##########
File path: core/src/main/scala/kafka/log/LocalLog.scala
##########
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ *                         clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+               @volatile var config: LogConfig,
+               @volatile var recoveryPoint: Long,
+               scheduler: Scheduler,
+               val time: Time,
+               val topicPartition: TopicPartition,
+               logDirFailureChannel: LogDirFailureChannel,
+               private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
+
+  // Log dir failure is handled asynchronously we need to prevent threads
+  // from reading inconsistent state caused by a failure in another thread
+  @volatile private[log] var logDirOffline = false
+
+  // The actual segments of the log
+  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = 
new ConcurrentSkipListMap[java.lang.Long, LogSegment]
+
+  locally {
+    // Create the log directory if it doesn't exist
+    Files.createDirectories(dir.toPath)
+  }
+
+  private[log] def dir: File = _dir
+
+  private[log] def name = dir.getName()
+
+  private[log] def parentDir: String = _parentDir
+
+  private[log] def parentDirFile: File = new File(_parentDir)
+
+  private[log] def isFuture: Boolean = dir.getName.endsWith(FutureDirSuffix)
+
+  private[log] def initFileSize: Int = {
+    if (config.preallocate)
+      config.segmentSize
+    else
+      0
+  }
+
+  /**
+   * Rename the directory of the log
+   * @param name the new dir name
+   * @throws KafkaStorageException if rename fails
+   */
+  private[log] def renameDir(name: String): Boolean = {
+    maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
log dir ${dir.getParent}") {
+      val renamedDir = new File(dir.getParent, name)
+      Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
+      if (renamedDir != dir) {
+        _dir = renamedDir
+        _parentDir = renamedDir.getParent
+        logSegments.foreach(_.updateParentDir(renamedDir))
+        true
+      } else {
+        false
+      }
+    }
+  }
+
+  private[log] def updateConfig(newConfig: LogConfig): Unit = {
+    val oldConfig = this.config
+    this.config = newConfig
+    val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
+    val newRecordVersion = newConfig.messageFormatVersion.recordVersion
+    if (newRecordVersion.precedes(oldRecordVersion))
+      warn(s"Record format version has been downgraded from $oldRecordVersion 
to $newRecordVersion.")
+  }
+
+  private[log] def checkIfMemoryMappedBufferClosed(): Unit = {
+    if (isMemoryMappedBufferClosed)
+      throw new KafkaStorageException(s"The memory mapped buffer for log of 
$topicPartition is already closed")
+  }
+
+  private[log] def checkForLogDirFailure(): Unit = {
+    if (logDirOffline) {
+      throw new KafkaStorageException(s"The log dir $parentDir is offline due 
to a previous IO exception.")
+    }
+  }
+
+  private[log] def updateRecoveryPoint(newRecoveryPoint: Long): Unit = {
+    recoveryPoint = newRecoveryPoint
+  }
+
+  /**
+   * Update recoveryPoint to provided offset and mark the log as flushed, if 
the offset is greater
+   * than the existing recoveryPoint.
+   *
+   * @param offset the offset to be updated
+   */
+  private[log] def markFlushed(offset: Long): Unit = {
+    checkIfMemoryMappedBufferClosed()
+    if (offset > recoveryPoint) {
+      updateRecoveryPoint(offset)
+      lastFlushedTime.set(time.milliseconds)
+    }
+  }
+
+  /**
+   * The time this log is last known to have been fully flushed to disk
+   */
+  private[log] def lastFlushTime: Long = lastFlushedTime.get
+
+  /**
+   * The offset metadata of the next message that will be appended to the log
+   */
+  private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+
+  /**
+   * The offset of the next message that will be appended to the log
+   */
+  private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset
+
+  /**
+   * Update end offset of the log, and the recoveryPoint.
+   *
+   * @param endOffset the new end offset of the log
+   */
+  private[log] def updateLogEndOffset(endOffset: Long): Unit = {
+    nextOffsetMetadata = LogOffsetMetadata(endOffset, 
activeSegment.baseOffset, activeSegment.size)
+    if (recoveryPoint > endOffset) {
+      updateRecoveryPoint(endOffset)
+    }
+  }
+
+  /**
+   * @return the base offset of the first local segment, if it exists
+   */
+  private[log] def firstSegmentBaseOffset: Option[Long] = 
Option(segments.firstEntry).map(_.getValue.baseOffset)
+
+  /**
+   * The active segment that is currently taking appends
+   */
+  private[log] def activeSegment = segments.lastEntry.getValue
+
+  /**
+   * The number of segments in the log.
+   * Take care! this is an O(n) operation.
+   */
+  private[log] def numberOfSegments: Int = segments.size
+
+  /**
+   * The size of the log in bytes
+   */
+  private[log] def size: Long = LocalLog.sizeInBytes(logSegments)
+
+  /**
+   * All the log segments in this log ordered from oldest to newest
+   */
+  private[log] def logSegments: Iterable[LogSegment] = segments.values.asScala
+
+  /**
+   * Get all segments beginning with the segment that includes "from" and 
ending with the segment
+   * that includes up to "to-1" or the end of the log (if to > logEndOffset).
+   */
+  private[log] def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
+    if (from == to) {
+      // Handle non-segment-aligned empty sets
+      List.empty[LogSegment]
+    } else if (to < from) {
+      throw new IllegalArgumentException(s"Invalid log segment range: 
requested segments in $topicPartition " +
+        s"from offset $from which is greater than limit offset $to")
+    } else {
+      val view = Option(segments.floorKey(from)).map { floor =>
+        segments.subMap(floor, to)
+      }.getOrElse(segments.headMap(to))
+      view.values.asScala
+    }
+  }
+
+  /**
+   * Return all non-active log segments beginning with the segment that 
includes "from".
+   *
+   * @param from the from offset
+   */
+  private[log] def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] 
= {
+    if (from > activeSegment.baseOffset)
+      Seq.empty
+    else
+      logSegments(from, activeSegment.baseOffset)
+  }
+
+  private[log]  def recordVersion: RecordVersion = 
config.messageFormatVersion.recordVersion
+
+  private[log]  def lowerSegment(offset: Long): Option[LogSegment] =
+    Option(segments.lowerEntry(offset)).map(_.getValue)
+
+  /**
+   * Get the largest log segment with a base offset less than or equal to the 
given offset, if one exists.
+   * @return the optional log segment
+   */
+  private[log]  def floorLogSegment(offset: Long): Option[LogSegment] = {
+    Option(segments.floorEntry(offset)).map(_.getValue)
+  }
+
+  /**
+   * Add the given segment to the segments in this log. If this segment 
replaces an existing segment, delete it.
+   * @param segment The segment to add
+   */
+  @threadsafe
+  private[log] def addSegment(segment: LogSegment): LogSegment = 
this.segments.put(segment.baseOffset, segment)
+
+  /**
+   * Clears all segments
+   */
+  private[log] def clearSegments(): Unit = segments.clear()
+
+  /**
+   * Closes all segments
+   */
+  private[log] def closeSegments(): Unit = {
+    logSegments.foreach(_.close())
+  }
+
+  /**
+   * Close file handlers used by log but don't write to disk. This is called 
if the log directory is offline
+   */
+  private[log] def closeHandlers(): Unit = {
+    logSegments.foreach(_.closeHandlers())
+    isMemoryMappedBufferClosed = true
+  }
+
+  /**
+   * Closes the log.
+   */
+  private[log] def close(): Unit = {
+    maybeHandleIOException(s"Error while deleting log for $topicPartition in 
dir ${dir.getParent}") {
+      checkIfMemoryMappedBufferClosed()
+      closeSegments()
+    }
+  }
+
+  /**
+   * Completely delete this log directory and all contents from the file 
system with no delay
+   */
+  private[log] def delete(): Seq[LogSegment] = {
+    maybeHandleIOException(s"Error while deleting log for $topicPartition in 
dir ${dir.getParent}") {
+      checkIfMemoryMappedBufferClosed()
+      val deleted = logSegments.toSeq
+      removeAndDeleteSegments(logSegments, asyncDelete = false, LogDeletion)
+      Utils.delete(dir)
+      // File handlers will be closed if this log is deleted
+      isMemoryMappedBufferClosed = true
+      deleted
+    }
+  }
+
+  /**
+   * Load the log segments from the log files on disk and update the next 
offset.
+   * This method does not need to convert IOException to KafkaStorageException 
because it is usually called before all logs
+   * are loaded.
+   *
+   * @param logStartOffset the log start offset
+   * @param maxProducerIdExpirationMs The maximum amount of time to wait 
before a producer id is considered expired
+   * @param producerStateManager The ProducerStateManager instance
+   * @param leaderEpochCache The LeaderEpochFileCache instance
+   *
+   * @return the list of deleted segments
+   *
+   * @throws LogSegmentOffsetOverflowException if we encounter a .swap file 
with messages that overflow index offset; or when
+   *                                           we find an unexpected number of 
.log files with overflow
+   */
+  private[log] def loadSegments(logStartOffset: Long,
+                                maxProducerIdExpirationMs: Int,
+                                producerStateManager: ProducerStateManager,
+                                leaderEpochCache: 
Option[LeaderEpochFileCache]): Seq[LogSegment] = {
+    // first do a pass through the files in the log directory and remove any 
temporary files
+    // and find any interrupted swap operations
+    val swapFiles = removeTempFilesAndCollectSwapFiles()
+
+    // Now do a second pass and load all the log and index files.
+    // We might encounter legacy log segments with offset overflow 
(KAFKA-6264). We need to split such segments. When
+    // this happens, restart loading segment files from scratch.
+    retryOnOffsetOverflow({
+      // In case we encounter a segment with offset overflow, the retry logic 
will split it after which we need to retry
+      // loading of segments. In that case, we also need to close all segments 
that could have been left open in previous
+      // call to loadSegmentFiles().
+      logSegments.foreach(_.close())
+      segments.clear()
+      loadSegmentFiles(logStartOffset, maxProducerIdExpirationMs)
+    })
+
+    val deletedSegments = ListBuffer[LogSegment]()
+
+    // Finally, complete any interrupted swap operations. To be crash-safe,
+    // log files that are replaced by the swap segment should be renamed to 
.deleted
+    // before the swap file is restored as the new segment file.
+    deletedSegments ++= completeSwapOperations(swapFiles, logStartOffset, 
maxProducerIdExpirationMs)
+
+    if (!dir.getAbsolutePath.endsWith(DeleteDirSuffix)) {
+      val (deleted, nextOffset) = retryOnOffsetOverflow(
+        {
+          recoverLog(logStartOffset,
+                     maxProducerIdExpirationMs,
+                     producerStateManager,
+                     leaderEpochCache)
+        })
+      deletedSegments ++= deleted
+
+      // reset the index size of the currently active log segment to allow 
more entries
+      activeSegment.resizeIndexes(config.maxIndexSize)
+      updateLogEndOffset(nextOffset)
+    } else {
+      if (logSegments.isEmpty) {
+        addSegment(LogSegment.open(dir = dir,
+          baseOffset = 0,
+          config,
+          time = time,
+          initFileSize = this.initFileSize))
+      }
+      updateLogEndOffset(0)
+    }
+    deletedSegments.toSeq
+  }
+
+  /**
+   * Recover the log segments and return the next offset after recovery.
+   * This method does not need to convert IOException to KafkaStorageException 
because it is usually called before all
+   * logs are loaded.
+   *
+   * @param logStartOffset the log start offset
+   * @param maxProducerIdExpirationMs The maximum amount of time to wait 
before a producer id is considered expired
+   * @param producerStateManager The ProducerStateManager instance
+   * @param leaderEpochCache The LeaderEpochFileCache instance
+   *
+   * @return the list of deleted segments and the next offset
+   *
+   * @throws LogSegmentOffsetOverflowException if we encountered a legacy 
segment with offset overflow
+   */
+  private[log] def recoverLog(logStartOffset: Long,
+                              maxProducerIdExpirationMs: Int,
+                              producerStateManager: ProducerStateManager,
+                              leaderEpochCache: Option[LeaderEpochFileCache]): 
(List[LogSegment], Long) = {
+    val deleted = scala.collection.mutable.ListBuffer[LogSegment]()
+    /** return the log end offset if valid */
+    def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = {
+      if (logSegments.nonEmpty) {
+        val logEndOffset = activeSegment.readNextOffset
+        if (logEndOffset >= logStartOffset)
+          Some(logEndOffset)
+        else {
+          warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
+            "This could happen if segment files were deleted from the file 
system.")
+          val toDelete = logSegments.toList
+          removeAndDeleteSegments(logSegments, asyncDelete = true, LogRecovery)
+          deleted ++= toDelete
+          leaderEpochCache.foreach(_.clearAndFlush())
+          producerStateManager.truncateFullyAndStartAt(logStartOffset)
+          None
+        }
+      } else None
+    }
+
+    // if we have the clean shutdown marker, skip recovery
+    if (!hadCleanShutdown) {
+      val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
+      var truncated = false
+
+      while (unflushed.hasNext && !truncated) {
+        val segment = unflushed.next()
+        info(s"Recovering unflushed segment ${segment.baseOffset}")
+        val truncatedBytes =
+          try {
+            recoverSegment(logStartOffset, segment, maxProducerIdExpirationMs, 
leaderEpochCache)
+          } catch {
+            case _: InvalidOffsetException =>
+              val startOffset = segment.baseOffset
+              warn("Found invalid offset during recovery. Deleting the corrupt 
segment and " +
+                s"creating an empty one with starting offset $startOffset")
+              segment.truncateTo(startOffset)
+          }
+        if (truncatedBytes > 0) {
+          // we had an invalid message, delete all remaining log
+          warn(s"Corruption found in segment ${segment.baseOffset}, truncating 
to offset ${segment.readNextOffset}")
+          val toDelete = unflushed.toList
+          removeAndDeleteSegments(toDelete,
+                                  asyncDelete = true,
+                                  reason = LogRecovery)
+          deleted ++= toDelete
+          truncated = true
+        }
+      }
+    }
+
+    val logEndOffsetOption = deleteSegmentsIfLogStartGreaterThanLogEnd()
+
+    if (logSegments.isEmpty) {
+      // no existing segments, create a new mutable segment beginning at 
logStartOffset
+      addSegment(LogSegment.open(dir = dir,
+        baseOffset = logStartOffset,
+        config,
+        time = time,
+        initFileSize = this.initFileSize,
+        preallocate = config.preallocate))
+    }
+
+    // Update the recovery point if there was a clean shutdown and did not 
perform any changes to
+    // the segment. Otherwise, we just ensure that the recovery point is not 
ahead of the log end
+    // offset. To ensure correctness and to make it easier to reason about, 
it's best to only advance
+    // the recovery point in flush(Long). If we advanced the recovery point 
here, we could skip recovery for
+    // unflushed segments if the broker crashed after we checkpoint the 
recovery point and before we flush the
+    // segment.
+    (hadCleanShutdown, logEndOffsetOption) match {
+      case (true, Some(logEndOffset)) =>
+        updateRecoveryPoint(logEndOffset)
+        (deleted.toList, logEndOffset)
+      case _ =>
+        val logEndOffset = 
logEndOffsetOption.getOrElse(activeSegment.readNextOffset)
+        updateRecoveryPoint(Math.min(recoveryPoint, logEndOffset))
+        (deleted.toList, logEndOffset)
+    }
+  }
+
+  /**
+   * This method does not need to convert IOException to KafkaStorageException 
because it is only called before all logs are loaded
+   * It is possible that we encounter a segment with index offset overflow in 
which case the LogSegmentOffsetOverflowException
+   * will be thrown. Note that any segments that were opened before we 
encountered the exception will remain open and the
+   * caller is responsible for closing them appropriately, if needed.
+   *
+   * @param logStartOffset the log start offset
+   * @param maxProducerIdExpirationMs The maximum amount of time to wait 
before a producer id is considered expired
+   *
+   * @throws LogSegmentOffsetOverflowException if the log directory contains a 
segment with messages that overflow the index offset
+   */
+  private[log] def loadSegmentFiles(logStartOffset: Long, 
maxProducerIdExpirationMs: Int): Unit = {
+    // load segments in ascending order because transactional data from one 
segment may depend on the
+    // segments that come before it
+    for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
+      if (isIndexFile(file)) {
+        // if it is an index file, make sure it has a corresponding .log file
+        val offset = offsetFromFile(file)
+        val logFile = LocalLog.logFile(dir, offset)
+        if (!logFile.exists) {
+          warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no 
corresponding log file.")
+          Files.deleteIfExists(file.toPath)
+        }
+      } else if (isLogFile(file)) {
+        // if it's a log file, load the corresponding log segment
+        val baseOffset = offsetFromFile(file)
+        val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, 
baseOffset).exists()
+        val segment = LogSegment.open(dir = dir,
+          baseOffset = baseOffset,
+          config,
+          time = time,
+          fileAlreadyExists = true)
+
+        try segment.sanityCheck(timeIndexFileNewlyCreated)
+        catch {
+          case _: NoSuchFileException =>
+            error(s"Could not find offset index file corresponding to log file 
${segment.log.file.getAbsolutePath}, " +
+              "recovering segment and rebuilding index files...")
+            recoverSegment(logStartOffset, segment, maxProducerIdExpirationMs)
+          case e: CorruptIndexException =>
+            warn(s"Found a corrupted index file corresponding to log file 
${segment.log.file.getAbsolutePath} due " +
+              s"to ${e.getMessage}}, recovering segment and rebuilding index 
files...")
+            recoverSegment(logStartOffset, segment, maxProducerIdExpirationMs)
+        }
+        addSegment(segment)
+      }
+    }
+  }
+
+  /**
+   * Recover the given segment.
+   *
+   * @param logStartOffset the log start offset
+   * @param segment Segment to recover
+   * @param maxProducerIdExpirationMs The maximum amount of time to wait 
before a producer id is considered expired
+   * @param leaderEpochCache Optional cache for updating the leader epoch 
during recovery
+   *
+   * @return The number of bytes truncated from the segment
+   *
+   * @throws LogSegmentOffsetOverflowException if the segment contains 
messages that cause index offset overflow
+   */
+  private[log] def recoverSegment(logStartOffset: Long,
+                                  segment: LogSegment,
+                                  maxProducerIdExpirationMs: Int,
+                                  leaderEpochCache: 
Option[LeaderEpochFileCache] = None): Int = {
+    val producerStateManager = new ProducerStateManager(topicPartition, dir, 
maxProducerIdExpirationMs)
+    rebuildProducerState(logStartOffset, segment.baseOffset, 
reloadFromCleanShutdown = false, producerStateManager)
+    val bytesTruncated = segment.recover(producerStateManager, 
leaderEpochCache)
+    // once we have recovered the segment's data, take a snapshot to ensure 
that we won't
+    // need to reload the same segment again while recovering another segment.
+    producerStateManager.takeSnapshot()
+    bytesTruncated
+  }
+
+  /**
+   * This method does not need to convert IOException to KafkaStorageException 
because it is only called before all logs
+   * are loaded.
+   * @throws LogSegmentOffsetOverflowException if the swap file contains 
messages that cause the log segment offset to
+   *                                           overflow. Note that this is 
currently a fatal exception as we do not have
+   *                                           a way to deal with it. The 
exception is propagated all the way up to
+   *                                           KafkaServer#startup which will 
cause the broker to shut down if we are in
+   *                                           this situation. This is 
expected to be an extremely rare scenario in practice,
+   *                                           and manual intervention might 
be required to get out of it.
+   */
+  private[log] def completeSwapOperations(swapFiles: Set[File],
+                                          logStartOffset: Long,
+                                          maxProducerIdExpirationMs: Int): 
Seq[LogSegment] = {
+    val deletedSegments = ListBuffer[LogSegment]()
+    for (swapFile <- swapFiles) {
+      val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, 
SwapFileSuffix, ""))
+      val baseOffset = offsetFromFile(logFile)
+      val swapSegment = LogSegment.open(swapFile.getParentFile,
+        baseOffset = baseOffset,
+        config,
+        time = time,
+        fileSuffix = SwapFileSuffix)
+      info(s"Found log file ${swapFile.getPath} from interrupted swap 
operation, repairing.")
+      recoverSegment(logStartOffset, swapSegment, maxProducerIdExpirationMs)
+
+      // We create swap files for two cases:
+      // (1) Log cleaning where multiple segments are merged into one, and
+      // (2) Log splitting where one segment is split into multiple.
+      //
+      // Both of these mean that the resultant swap segments be composed of 
the original set, i.e. the swap segment
+      // must fall within the range of existing segment(s). If we cannot find 
such a segment, it means the deletion
+      // of that segment was successful. In such an event, we should simply 
rename the .swap to .log without having to
+      // do a replace with an existing segment.
+      val oldSegments = logSegments(swapSegment.baseOffset, 
swapSegment.readNextOffset).filter { segment =>
+        segment.readNextOffset > swapSegment.baseOffset
+      }
+      val deleted = replaceSegments(Seq(swapSegment), oldSegments.toSeq, 
isRecoveredSwapFile = true)
+      deletedSegments ++= deleted
+    }
+    deletedSegments.toSeq
+  }
+
+  /**
+   * Removes any temporary files found in log directory, and creates a list of 
all .swap files which could be swapped
+   * in place of existing segment(s). For log splitting, we know that any 
.swap file whose base offset is higher than
+   * the smallest offset .clean file could be part of an incomplete split 
operation. Such .swap files are also deleted
+   * by this method.
+   *
+   * @return Set of .swap files that are valid to be swapped in as segment 
files
+   */
+  private[log] def removeTempFilesAndCollectSwapFiles(): Set[File] = {
+
+    def deleteIndicesIfExist(baseFile: File, suffix: String = ""): Unit = {
+      info(s"Deleting index files with suffix $suffix for baseFile $baseFile")
+      val offset = offsetFromFile(baseFile)
+      Files.deleteIfExists(Log.offsetIndexFile(dir, offset, suffix).toPath)
+      Files.deleteIfExists(Log.timeIndexFile(dir, offset, suffix).toPath)
+      Files.deleteIfExists(Log.transactionIndexFile(dir, offset, 
suffix).toPath)
+    }
+
+    val swapFiles = mutable.Set[File]()
+    val cleanFiles = mutable.Set[File]()
+    var minCleanedFileOffset = Long.MaxValue
+
+    for (file <- dir.listFiles if file.isFile) {
+      if (!file.canRead)
+        throw new IOException(s"Could not read file $file")
+      val filename = file.getName
+      if (filename.endsWith(DeletedFileSuffix)) {
+        debug(s"Deleting stray temporary file ${file.getAbsolutePath}")
+        Files.deleteIfExists(file.toPath)
+      } else if (filename.endsWith(CleanedFileSuffix)) {
+        minCleanedFileOffset = Math.min(offsetFromFileName(filename), 
minCleanedFileOffset)
+        cleanFiles += file
+      } else if (filename.endsWith(SwapFileSuffix)) {
+        // we crashed in the middle of a swap operation, to recover:
+        // if a log, delete the index files, complete the swap operation later
+        // if an index just delete the index files, they will be rebuilt
+        val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
+        info(s"Found file ${file.getAbsolutePath} from interrupted swap 
operation.")
+        if (isIndexFile(baseFile)) {
+          deleteIndicesIfExist(baseFile)
+        } else if (isLogFile(baseFile)) {
+          deleteIndicesIfExist(baseFile)
+          swapFiles += file
+        }
+      }
+    }
+
+    // KAFKA-6264: Delete all .swap files whose base offset is greater than 
the minimum .cleaned segment offset. Such .swap
+    // files could be part of an incomplete split operation that could not 
complete. See LocalLog#splitOverflowedSegment
+    // for more details about the split operation.
+    val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => 
offsetFromFile(file) >= minCleanedFileOffset)
+    invalidSwapFiles.foreach { file =>
+      debug(s"Deleting invalid swap file ${file.getAbsoluteFile} 
minCleanedFileOffset: $minCleanedFileOffset")
+      val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, 
SwapFileSuffix, ""))
+      deleteIndicesIfExist(baseFile, SwapFileSuffix)
+      Files.deleteIfExists(file.toPath)
+    }
+
+    // Now that we have deleted all .swap files that constitute an incomplete 
split operation, let's delete all .clean files
+    cleanFiles.foreach { file =>
+      debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
+      Files.deleteIfExists(file.toPath)
+    }
+
+    validSwapFiles
+  }
+
+  private[log] def retryOnOffsetOverflow[T](fn: => T): T = {
+    while (true) {
+      try {
+        return fn
+      } catch {
+        case e: LogSegmentOffsetOverflowException =>
+          info(s"Caught segment overflow error: ${e.getMessage}. Split segment 
and retry.")
+          splitOverflowedSegment(e.segment)
+      }
+    }
+    throw new IllegalStateException()
+  }
+
+  private[log] def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
+    try {
+      checkForLogDirFailure()
+      fun
+    } catch {
+      case e: IOException =>
+        logDirOffline = true
+        logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e)
+        throw new KafkaStorageException(msg, e)
+    }
+  }
+
+  /**
+   * Split a segment into one or more segments such that there is no offset 
overflow in any of them. The
+   * resulting segments will contain the exact same messages that are present 
in the input segment. On successful
+   * completion of this method, the input segment will be deleted and will be 
replaced by the resulting new segments.
+   * See replaceSegments for recovery logic, in case the broker dies in the 
middle of this operation.
+   * <p>Note that this method assumes we have already determined that the 
segment passed in contains records that cause
+   * offset overflow.</p>
+   * <p>The split logic overloads the use of .clean files that LogCleaner 
typically uses to make the process of replacing
+   * the input segment with multiple new segments atomic and recoverable in 
the event of a crash. See replaceSegments
+   * and completeSwapOperations for the implementation to make this operation 
recoverable on crashes.</p>
+   *
+   * @param segment Segment to split
+   *
+   * @return a result instance containing list of new segments that replace 
the input segment and deleted segments (if any)
+   */
+  private[log] def splitOverflowedSegment(segment: LogSegment): 
SplitSegmentResult = {
+    require(isLogFile(segment.log.file), s"Cannot split file 
${segment.log.file.getAbsoluteFile}")
+    require(segment.hasOverflow, "Split operation is only permitted for 
segments with overflow")
+
+    info(s"Splitting overflowed segment $segment")
+
+    val newSegments = ListBuffer[LogSegment]()
+    try {
+      var position = 0
+      val sourceRecords = segment.log
+
+      while (position < sourceRecords.sizeInBytes) {
+        val firstBatch = sourceRecords.batchesFrom(position).asScala.head
+        val newSegment = LogCleaner.createNewCleanedSegment(dir, config, 
firstBatch.baseOffset)
+        newSegments += newSegment
+
+        val bytesAppended = newSegment.appendFromFile(sourceRecords, position)
+        if (bytesAppended == 0)
+          throw new IllegalStateException(s"Failed to append records from 
position $position in $segment")
+
+        position += bytesAppended
+      }
+
+      // prepare new segments
+      var totalSizeOfNewSegments = 0
+      newSegments.foreach { splitSegment =>
+        splitSegment.onBecomeInactiveSegment()
+        splitSegment.flush()
+        splitSegment.lastModified = segment.lastModified
+        totalSizeOfNewSegments += splitSegment.log.sizeInBytes
+      }
+      // size of all the new segments combined must equal size of the original 
segment
+      if (totalSizeOfNewSegments != segment.log.sizeInBytes)
+        throw new IllegalStateException("Inconsistent segment sizes after 
split" +
+          s" before: ${segment.log.sizeInBytes} after: 
$totalSizeOfNewSegments")
+
+      // replace old segment with new ones
+      info(s"Replacing overflowed segment $segment with split segments 
$newSegments")
+      val toAdd = newSegments.toSeq
+      val deletedSegments = replaceSegments(newSegments.toSeq, List(segment))
+      SplitSegmentResult(deletedSegments.toSeq, toAdd)
+    } catch {
+      case e: Exception =>
+        newSegments.foreach { splitSegment =>
+          splitSegment.close()
+          splitSegment.deleteIfExists()
+        }
+        throw e
+    }
+  }
+
+  /**
+   * Swap one or more new segment in place and delete one or more existing 
segments in a crash-safe manner. The old
+   * segments will be asynchronously deleted.
+   *
+   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
+   * or the caller will catch and handle IOException
+   *
+   * The sequence of operations is:
+   * <ol>
+   *   <li> Cleaner creates one or more new segments with suffix .cleaned and 
invokes replaceSegments().
+   *        If broker crashes at this point, the clean-and-swap operation is 
aborted and
+   *        the .cleaned files are deleted on recovery in loadSegments().
+   *   <li> New segments are renamed .swap. If the broker crashes before all 
segments were renamed to .swap, the
+   *        clean-and-swap operation is aborted - .cleaned as well as .swap 
files are deleted on recovery in
+   *        loadSegments(). We detect this situation by maintaining a specific 
order in which files are renamed from
+   *        .cleaned to .swap. Basically, files are renamed in descending 
order of offsets. On recovery, all .swap files
+   *        whose offset is greater than the minimum-offset .clean file are 
deleted.
+   *   <li> If the broker crashes after all new segments were renamed to 
.swap, the operation is completed, the swap
+   *        operation is resumed on recovery as described in the next step.
+   *   <li> Old segment files are renamed to .deleted and asynchronous delete 
is scheduled.
+   *        If the broker crashes, any .deleted files left behind are deleted 
on recovery in loadSegments().
+   *        replaceSegments() is then invoked to complete the swap with 
newSegment recreated from
+   *        the .swap file and oldSegments containing segments which were not 
renamed before the crash.
+   *   <li> Swap segment(s) are renamed to replace the existing segments, 
completing this operation.
+   *        If the broker crashes, any .deleted files which may be left behind 
are deleted
+   *        on recovery in loadSegments().
+   * </ol>
+   *
+   * @param newSegments The new log segment to add to the log
+   * @param oldSegments The old log segments to delete from the log
+   * @param isRecoveredSwapFile true if the new segment was created from a 
swap file during recovery after a crash
+   *
+   * @return segments which were deleted but not replaced
+   */
+  private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Seq[LogSegment] = {
+    val sortedNewSegments = newSegments.sortBy(_.baseOffset)
+    // Some old segments may have been removed from index and scheduled for 
async deletion after the caller reads segments
+    // but before this method is executed. We want to filter out those 
segments to avoid calling asyncDeleteSegment()
+    // multiple times for the same segment.
+    val sortedOldSegments = oldSegments.filter(seg => 
segments.containsKey(seg.baseOffset)).sortBy(_.baseOffset)
+
+    checkIfMemoryMappedBufferClosed()
+    // need to do this in two phases to be crash safe AND do the delete 
asynchronously
+    // if we crash in the middle of this we complete the swap in loadSegments()
+    if (!isRecoveredSwapFile)
+      
sortedNewSegments.reverse.foreach(_.changeFileSuffixes(Log.CleanedFileSuffix, 
Log.SwapFileSuffix))
+    sortedNewSegments.reverse.foreach(addSegment(_))
+    val newSegmentBaseOffsets = sortedNewSegments.map(_.baseOffset).toSet
+
+    // delete the old files
+    val deletedNotReplaced = sortedOldSegments.map { seg =>
+      // remove the index entry
+      if (seg.baseOffset != sortedNewSegments.head.baseOffset)
+        segments.remove(seg.baseOffset)
+      deleteSegmentFiles(List(seg), asyncDelete = true)
+      if (newSegmentBaseOffsets.contains(seg.baseOffset)) Option.empty else 
Some(seg)
+    }.filter(item => item.isDefined).map(item => item.get)
+    // okay we are safe now, remove the swap suffix
+    sortedNewSegments.foreach(_.changeFileSuffixes(Log.SwapFileSuffix, ""))
+    deletedNotReplaced
+  }
+
+  /**
+   * Find segments starting from the oldest until the user-supplied predicate 
is false or the segment
+   * containing the current high watermark is reached. We do not delete 
segments with offsets at or beyond
+   * the high watermark to ensure that the log start offset can never exceed 
it. If the high watermark
+   * has not yet been initialized, no segments are eligible for deletion.
+   *
+   * A final segment that is empty will never be returned (since we would just 
end up re-creating it).
+   *
+   * @param predicate A function that takes in a candidate log segment and the 
next higher segment
+   *                  (if there is one), logEndOffset and returns true iff it 
is deletable
+   * @return the segments ready to be deleted
+   */
+  private[log] def deletableSegments(predicate: (LogSegment, 
Option[LogSegment], Long) => Boolean): Iterable[LogSegment] = {
+    if (segments.isEmpty) {
+      Seq.empty
+    } else {
+      val deletable = ArrayBuffer.empty[LogSegment]
+      var segmentEntry = segments.firstEntry
+      while (segmentEntry != null) {
+        val segment = segmentEntry.getValue
+        val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
+        val (nextSegment, isLastSegmentAndEmpty) = if (nextSegmentEntry != 
null)
+          (nextSegmentEntry.getValue, false)
+        else
+          (null, segment.size == 0)
+
+        if (predicate(segment, Option(nextSegment), logEndOffset) && 
!isLastSegmentAndEmpty) {
+          deletable += segment
+          segmentEntry = nextSegmentEntry
+        } else {
+          segmentEntry = null
+        }
+      }
+      deletable
+    }
+  }
+
+  /**
+   * Perform physical deletion for the given segments. Allows the segments to 
be deleted asynchronously or synchronously.
+   *
+   * This method assumes that the segment exists and the method is not 
thread-safe.
+   *
+   * This method does not need to convert IOException (thrown from 
changeFileSuffixes) to KafkaStorageException because

Review comment:
       The method only converts `IOException` to `KafkaStorageException` inside 
the internal `deleteSegments()` helper function. The comment is referring to 
only `changeFileSuffixes`. I've improved the `@throws` doc to refer to both 
exceptions now. Please let me know if it needs further improvement.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to