[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602062784



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1816,8 +1292,12 @@ class Log(@volatile private var _dir: File,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
+def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment], 
logEndOffset: Long): Boolean = {
+  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(logEndOffset) &&

Review comment:
   This is to accomodate for the hwm check that was previously happening in 
`Log#deletableSegments` in [this 
line](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1872).
 The `deletableSegments` method has now moved to `LocalLog`, but we can't do 
the hwm check inside `LocalLog` since hwm is still owned by `Log`. So we 
piggyback on the predicate here to additionally attach the hwm check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602071574



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -836,178 +581,15 @@ class Log(@volatile private var _dir: File,
   updateHighWatermark(offset)
 }
 
-if (this.recoveryPoint < offset) {
-  this.recoveryPoint = offset
-}
-  }
-
-  /**
-   * Recover the log segments and return the next offset after recovery.
-   * This method does not need to convert IOException to KafkaStorageException 
because it is only called before all
-   * logs are loaded.
-   * @throws LogSegmentOffsetOverflowException if we encountered a legacy 
segment with offset overflow
-   */
-  private[log] def recoverLog(): Long = {
-/** return the log end offset if valid */
-def deleteSegmentsIfLogStartGreaterThanLogEnd(): Option[Long] = {
-  if (logSegments.nonEmpty) {
-val logEndOffset = activeSegment.readNextOffset
-if (logEndOffset >= logStartOffset)
-  Some(logEndOffset)
-else {
-  warn(s"Deleting all segments because logEndOffset ($logEndOffset) is 
smaller than logStartOffset ($logStartOffset). " +
-"This could happen if segment files were deleted from the file 
system.")
-  removeAndDeleteSegments(logSegments, asyncDelete = true, LogRecovery)
-  leaderEpochCache.foreach(_.clearAndFlush())
-  producerStateManager.truncateFullyAndStartAt(logStartOffset)
-  None
-}
-  } else None
-}
-
-// if we have the clean shutdown marker, skip recovery
-if (!hadCleanShutdown) {
-  val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).iterator
-  var truncated = false
-
-  while (unflushed.hasNext && !truncated) {
-val segment = unflushed.next()
-info(s"Recovering unflushed segment ${segment.baseOffset}")
-val truncatedBytes =
-  try {
-recoverSegment(segment, leaderEpochCache)
-  } catch {
-case _: InvalidOffsetException =>
-  val startOffset = segment.baseOffset
-  warn("Found invalid offset during recovery. Deleting the corrupt 
segment and " +
-s"creating an empty one with starting offset $startOffset")
-  segment.truncateTo(startOffset)
-  }
-if (truncatedBytes > 0) {
-  // we had an invalid message, delete all remaining log
-  warn(s"Corruption found in segment ${segment.baseOffset}, truncating 
to offset ${segment.readNextOffset}")
-  removeAndDeleteSegments(unflushed.toList,
-asyncDelete = true,
-reason = LogRecovery)
-  truncated = true
-}
-  }
-}
-
-val logEndOffsetOption = deleteSegmentsIfLogStartGreaterThanLogEnd()
-
-if (logSegments.isEmpty) {
-  // no existing segments, create a new mutable segment beginning at 
logStartOffset
-  addSegment(LogSegment.open(dir = dir,
-baseOffset = logStartOffset,
-config,
-time = time,
-initFileSize = this.initFileSize,
-preallocate = config.preallocate))
-}
-
-// Update the recovery point if there was a clean shutdown and did not 
perform any changes to
-// the segment. Otherwise, we just ensure that the recovery point is not 
ahead of the log end
-// offset. To ensure correctness and to make it easier to reason about, 
it's best to only advance
-// the recovery point in flush(Long). If we advanced the recovery point 
here, we could skip recovery for
-// unflushed segments if the broker crashed after we checkpoint the 
recovery point and before we flush the
-// segment.
-(hadCleanShutdown, logEndOffsetOption) match {
-  case (true, Some(logEndOffset)) =>
-recoveryPoint = logEndOffset
-logEndOffset
-  case _ =>
-val logEndOffset = 
logEndOffsetOption.getOrElse(activeSegment.readNextOffset)
-recoveryPoint = Math.min(recoveryPoint, logEndOffset)
-logEndOffset
-}
-  }
-
-  // Rebuild producer state until lastOffset. This method may be called from 
the recovery code path, and thus must be
-  // free of all side-effects, i.e. it must not update any log-specific state.
-  private def rebuildProducerState(lastOffset: Long,
-   reloadFromCleanShutdown: Boolean,
-   producerStateManager: 
ProducerStateManager): Unit = lock synchronized {
-checkIfMemoryMappedBufferClosed()
-val segments = logSegments
-val offsetsToSnapshot =
-  if (segments.nonEmpty) {
-val nextLatestSegmentBaseOffset = 
lowerSegment(segments.last.baseOffset).map(_.baseOffset)
-Seq(nextLatestSegmentBaseOffset, Some(segments.last.baseOffset), 
Some(lastOffset))
-  } else {
-Seq(Some(lastOffset))
-  }
-info(s"Loading producer state till offset $lastOffset with message format 
version ${r

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602052452



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602062784



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1816,8 +1292,12 @@ class Log(@volatile private var _dir: File,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
+def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment], 
logEndOffset: Long): Boolean = {
+  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(logEndOffset) &&

Review comment:
   This is to accomodate for the hwm check that was previously happening in 
`Log#deletableSegments` in this 
[line](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1872).
 The `deletableSegments` method has now moved to `LocalLog`, but we can't do 
the hwm check inside `LocalLog` since hwm is still owned by `Log`. So we 
piggyback on the predicate here to additionally attach the hwm check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602062784



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1816,8 +1292,12 @@ class Log(@volatile private var _dir: File,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
+def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment], 
logEndOffset: Long): Boolean = {
+  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(logEndOffset) &&

Review comment:
   this is to accomodate for the hwm check that was previously happening in 
`Log#deletableSegments` in this 
[line](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1872).
 The `deletableSegments` method has now moved to `LocalLog`, so we piggyback on 
the predicate to additionally attach the hwm check.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1816,8 +1292,12 @@ class Log(@volatile private var _dir: File,
*/
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => 
Boolean,
 reason: SegmentDeletionReason): Int = {
+def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment], 
logEndOffset: Long): Boolean = {
+  highWatermark >= 
nextSegmentOpt.map(_.baseOffset).getOrElse(logEndOffset) &&

Review comment:
   This is to accomodate for the hwm check that was previously happening in 
`Log#deletableSegments` in this 
[line](https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1872).
 The `deletableSegments` method has now moved to `LocalLog`, so we piggyback on 
the predicate to additionally attach the hwm check.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602056487



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1794,20 +1269,21 @@ class Log(@volatile private var _dir: File,
 ret.toSeq.sortBy(-_)
   }
 
-  /**
-* Given a message offset, find its corresponding offset metadata in the 
log.
-* If the message offset is out of range, throw an OffsetOutOfRangeException
-*/
-  private def convertToOffsetMetadataOrThrow(offset: Long): LogOffsetMetadata 
= {
-val fetchDataInfo = read(offset,
-  maxLength = 1,
-  isolation = FetchLogEnd,
-  minOneMessage = false)
-fetchDataInfo.fetchOffsetMetadata
+  def convertToOffsetMetadata(offset: Long): Option[LogOffsetMetadata] = {

Review comment:
   Done. Removed now. Good catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602056301



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -213,32 +208,38 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
 }
 
 /**
- * An append-only log for storing messages.
+ * A log which presents a unified view of local and tiered log segments.
  *
- * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * The log consists of tiered and local segments with the tiered portion of 
the log being optional. There could be an
+ * overlap between the tiered and local segments. The active segment is always 
guaranteed to be local. If tiered segments
+ * are present, they always appear at the head of the log, followed by an 
optional region of overlap, followed by the local
+ * segments including the active segment.
  *
- * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
- * for a given segment.
+ * NOTE: this class handles state and behavior specific to tiered segments as 
well as any behavior combining both tiered
+ * and local segments. The state and behavior specific to local segments is 
handled by the encapsulated LocalLog instance.

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602056245



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -213,32 +208,38 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
 }
 
 /**
- * An append-only log for storing messages.
+ * A log which presents a unified view of local and tiered log segments.
  *
- * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * The log consists of tiered and local segments with the tiered portion of 
the log being optional. There could be an
+ * overlap between the tiered and local segments. The active segment is always 
guaranteed to be local. If tiered segments
+ * are present, they always appear at the head of the log, followed by an 
optional region of overlap, followed by the local

Review comment:
   Done. Good point. I've fixed the doc to refer to `beginning of log`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602055675



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602055217



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602055217



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-26 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602052452



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602052393



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602049695



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602049230



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602049062



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602048606



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602046133



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602046050



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602041672



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602041366



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602041263



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.
+ * New log segments are created according to a configurable policy that 
controls the size in bytes or time interval
+ * for a given segment.
+ *
+ * NOTE: this class is not thread-safe, and it relies on the thread safety 
provided by the Log class.
+ *
+ * @param _dir The directory in which log segments are created.
+ * @param config The log configuration settings
+ * @param recoveryPoint The offset at which to begin recovery i.e. the first 
offset which has not been flushed to disk
+ * @param scheduler The thread pool scheduler used for background actions
+ * @param time The time instance used for checking the clock
+ * @param topicPartition The topic partition associated with this log
+ * @param logDirFailureChannel The LogDirFailureChannel instance to 
asynchronously handle Log dir failure
+ * @param hadCleanShutdown boolean flag to indicate if the Log had a 
clean/graceful shutdown last time. true means
+ * clean shutdown whereas false means a crash.
+ */
+class LocalLog(@volatile private var _dir: File,
+   @volatile var config: LogConfig,
+   @volatile var recoveryPoint: Long,
+   scheduler: Scheduler,
+   val time: Time,
+   val topicPartition: TopicPartition,
+   logDirFailureChannel: LogDirFailureChannel,
+   private val hadCleanShutdown: Boolean = true) extends Logging 
with KafkaMetricsGroup {
+
+  import kafka.log.LocalLog._
+
+  this.logIdent = s"[Log partition=$topicPartition, dir=${dir.getParent}] "
+
+  // The memory mapped buffer for index files of this log will be closed with 
either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be 
performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
+
+  // Cache value of parent directory to avoid allocations in hot paths like 
ReplicaManager.checkpointHighWatermarks
+  @volatile private var _parentDir: String = dir.getParent
+
+  /* last time the log was flushed */
+  private val lastFlushedTime = new AtomicLong(time.milliseconds)
+
+  // The offset where the next message could be appended
+  @volatile private var nextOffsetMetadata

[GitHub] [kafka] kowshik commented on a change in pull request #10280: KIP-405: Log layer refactor

2021-03-25 Thread GitBox


kowshik commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r602040784



##
File path: core/src/main/scala/kafka/log/LocalLog.scala
##
@@ -0,0 +1,1561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log
+
+import java.io.{File, IOException}
+import java.lang.{Long => JLong}
+import java.nio.file.{Files, NoSuchFileException}
+import java.text.NumberFormat
+import java.util.concurrent.{ConcurrentNavigableMap, ConcurrentSkipListMap}
+import java.util.concurrent.atomic.AtomicLong
+import java.util.Map.{Entry => JEntry}
+import java.util.regex.Pattern
+
+import kafka.common.LogSegmentOffsetOverflowException
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.server.{FetchDataInfo, LogDirFailureChannel, LogOffsetMetadata}
+import kafka.utils.{CoreUtils, Logging, Scheduler, threadsafe}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.{InvalidOffsetException, 
KafkaStorageException, OffsetOutOfRangeException}
+import org.apache.kafka.common.message.FetchResponseData
+import org.apache.kafka.common.record.{MemoryRecords, RecordBatch, 
RecordVersion, Records}
+import org.apache.kafka.common.utils.{Time, Utils}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
+import scala.collection.{Seq, Set, mutable}
+
+// Used to define pre/post roll actions to be performed.
+case class RollAction(preRollAction: Long => Unit, postRollAction: 
(LogSegment, Option[LogSegment]) => Unit)
+
+// Used to hold the result of splitting a segment into one or more segments, 
see LocalLog#splitOverflowedSegment
+case class SplitSegmentResult(deletedSegments: Seq[LogSegment], newSegments: 
Seq[LogSegment])
+
+/**
+ * An append-only log for storing messages locally.
+ * The log is a sequence of LogSegments, each with a base offset denoting the 
first message in the segment.

Review comment:
   Done.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org