kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602041672



##########
File path: core/src/main/scala/kafka/log/LocalLog.scala
##########
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ *                         clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+               @volatile var config: LogConfig,
+               @volatile var recoveryPoint: Long,
+               scheduler: Scheduler,
+               val time: Time,
+               val topicPartition: TopicPartition,
+               logDirFailureChannel: LogDirFailureChannel,
+               private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
+
+  // Log dir failure is handled asynchronously we need to prevent threads
+  // from reading inconsistent state caused by a failure in another thread
+  @volatile private[log] var logDirOffline = false
+
+  // The actual segments of the log
+  private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = 
new ConcurrentSkipListMap[java.lang.Long, LogSegment]
+
+  locally {
+    // Create the log directory if it doesn't exist
+    Files.createDirectories(dir.toPath)
+  }
+
+  private[log] def dir: File = _dir
+
+  private[log] def name = dir.getName()
+
+  private[log] def parentDir: String = _parentDir
+
+  private[log] def parentDirFile: File = new File(_parentDir)
+
+  private[log] def isFuture: Boolean = dir.getName.endsWith(FutureDirSuffix)
+
+  private[log] def initFileSize: Int = {
+    if (config.preallocate)
+      config.segmentSize
+    else
+      0
+  }
+
+  /**
+   * Rename the directory of the log
+   * @param name the new dir name
+   * @throws KafkaStorageException if rename fails
+   */
+  private[log] def renameDir(name: String): Boolean = {
+    maybeHandleIOException(s"Error while renaming dir for $topicPartition in 
log dir ${dir.getParent}") {
+      val renamedDir = new File(dir.getParent, name)
+      Utils.atomicMoveWithFallback(dir.toPath, renamedDir.toPath)
+      if (renamedDir != dir) {
+        _dir = renamedDir
+        _parentDir = renamedDir.getParent
+        logSegments.foreach(_.updateParentDir(renamedDir))
+        true
+      } else {
+        false
+      }
+    }
+  }
+
+  private[log] def updateConfig(newConfig: LogConfig): Unit = {
+    val oldConfig = this.config
+    this.config = newConfig
+    val oldRecordVersion = oldConfig.messageFormatVersion.recordVersion
+    val newRecordVersion = newConfig.messageFormatVersion.recordVersion
+    if (newRecordVersion.precedes(oldRecordVersion))
+      warn(s"Record format version has been downgraded from $oldRecordVersion 
to $newRecordVersion.")
+  }
+
+  private[log] def checkIfMemoryMappedBufferClosed(): Unit = {
+    if (isMemoryMappedBufferClosed)
+      throw new KafkaStorageException(s"The memory mapped buffer for log of 
$topicPartition is already closed")
+  }
+
+  private[log] def checkForLogDirFailure(): Unit = {
+    if (logDirOffline) {
+      throw new KafkaStorageException(s"The log dir $parentDir is offline due 
to a previous IO exception.")
+    }
+  }
+
+  private[log] def updateRecoveryPoint(newRecoveryPoint: Long): Unit = {
+    recoveryPoint = newRecoveryPoint
+  }
+
+  /**
+   * Update recoveryPoint to provided offset and mark the log as flushed, if 
the offset is greater
+   * than the existing recoveryPoint.
+   *
+   * @param offset the offset to be updated
+   */
+  private[log] def markFlushed(offset: Long): Unit = {
+    checkIfMemoryMappedBufferClosed()
+    if (offset > recoveryPoint) {
+      updateRecoveryPoint(offset)
+      lastFlushedTime.set(time.milliseconds)
+    }
+  }
+
+  /**
+   * The time this log is last known to have been fully flushed to disk
+   */
+  private[log] def lastFlushTime: Long = lastFlushedTime.get
+
+  /**
+   * The offset metadata of the next message that will be appended to the log
+   */
+  private[log] def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+
+  /**
+   * The offset of the next message that will be appended to the log
+   */
+  private[log] def logEndOffset: Long = nextOffsetMetadata.messageOffset
+
+  /**
+   * Update end offset of the log, and the recoveryPoint.
+   *
+   * @param endOffset the new end offset of the log
+   */
+  private[log] def updateLogEndOffset(endOffset: Long): Unit = {
+    nextOffsetMetadata = LogOffsetMetadata(endOffset, 
activeSegment.baseOffset, activeSegment.size)
+    if (recoveryPoint > endOffset) {
+      updateRecoveryPoint(endOffset)
+    }
+  }
+
+  /**
+   * @return the base offset of the first local segment, if it exists
+   */
+  private[log] def firstSegmentBaseOffset: Option[Long] = 
Option(segments.firstEntry).map(_.getValue.baseOffset)
+
+  /**
+   * The active segment that is currently taking appends
+   */
+  private[log] def activeSegment = segments.lastEntry.getValue
+
+  /**
+   * The number of segments in the log.
+   * Take care! this is an O(n) operation.
+   */
+  private[log] def numberOfSegments: Int = segments.size
+
+  /**
+   * The size of the log in bytes
+   */
+  private[log] def size: Long = LocalLog.sizeInBytes(logSegments)
+
+  /**
+   * All the log segments in this log ordered from oldest to newest
+   */
+  private[log] def logSegments: Iterable[LogSegment] = segments.values.asScala
+
+  /**
+   * Get all segments beginning with the segment that includes "from" and 
ending with the segment
+   * that includes up to "to-1" or the end of the log (if to > logEndOffset).
+   */
+  private[log] def logSegments(from: Long, to: Long): Iterable[LogSegment] = {
+    if (from == to) {
+      // Handle non-segment-aligned empty sets
+      List.empty[LogSegment]
+    } else if (to < from) {
+      throw new IllegalArgumentException(s"Invalid log segment range: 
requested segments in $topicPartition " +
+        s"from offset $from which is greater than limit offset $to")
+    } else {
+      val view = Option(segments.floorKey(from)).map { floor =>
+        segments.subMap(floor, to)
+      }.getOrElse(segments.headMap(to))
+      view.values.asScala
+    }
+  }
+
+  /**
+   * Return all non-active log segments beginning with the segment that 
includes "from".
+   *
+   * @param from the from offset
+   */
+  private[log] def nonActiveLogSegmentsFrom(from: Long): Iterable[LogSegment] 
= {
+    if (from > activeSegment.baseOffset)
+      Seq.empty
+    else
+      logSegments(from, activeSegment.baseOffset)
+  }
+
+  private[log]  def recordVersion: RecordVersion = 
config.messageFormatVersion.recordVersion
+
+  private[log]  def lowerSegment(offset: Long): Option[LogSegment] =
+    Option(segments.lowerEntry(offset)).map(_.getValue)
+
+  /**
+   * Get the largest log segment with a base offset less than or equal to the 
given offset, if one exists.
+   * @return the optional log segment
+   */
+  private[log]  def floorLogSegment(offset: Long): Option[LogSegment] = {
+    Option(segments.floorEntry(offset)).map(_.getValue)
+  }
+
+  /**
+   * Add the given segment to the segments in this log. If this segment 
replaces an existing segment, delete it.
+   * @param segment The segment to add
+   */
+  @threadsafe
+  private[log] def addSegment(segment: LogSegment): LogSegment = 
this.segments.put(segment.baseOffset, segment)
+
+  /**
+   * Clears all segments
+   */
+  private[log] def clearSegments(): Unit = segments.clear()
+
+  /**
+   * Closes all segments
+   */
+  private[log] def closeSegments(): Unit = {

Review comment:
       Done. I've moved it into`close()` now. Good point.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to