junrao commented on a change in pull request #10478:
URL: https://github.com/apache/kafka/pull/10478#discussion_r610968190



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -255,19 +261,21 @@ case object SnapshotGenerated extends 
LogStartOffsetIncrementReason {
 @threadsafe
 class Log(@volatile private var _dir: File,
           @volatile var config: LogConfig,
+          val segments: LogSegments,
           @volatile var logStartOffset: Long,
           @volatile var recoveryPoint: Long,
+          @volatile var nextOffsetMetadata: LogOffsetMetadata,
           scheduler: Scheduler,
           brokerTopicStats: BrokerTopicStats,
           val time: Time,
           val maxProducerIdExpirationMs: Int,
           val producerIdExpirationCheckIntervalMs: Int,
           val topicPartition: TopicPartition,
+          @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
           val producerStateManager: ProducerStateManager,
           logDirFailureChannel: LogDirFailureChannel,
-          private val hadCleanShutdown: Boolean = true,
           @volatile var topicId: Option[Uuid],
-          val keepPartitionMetadataFile: Boolean) extends Logging with 
KafkaMetricsGroup {
+          val keepPartitionMetadataFile: Boolean = true) extends Logging with 
KafkaMetricsGroup {

Review comment:
       Is the default value needed? 

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2586,11 +1999,15 @@ object Log {
             logDirFailureChannel: LogDirFailureChannel,
             lastShutdownClean: Boolean = true,
             topicId: Option[Uuid],
-            keepPartitionMetadataFile: Boolean): Log = {
+            keepPartitionMetadataFile: Boolean = true): Log = {
+    // create the log directory if it doesn't exist
+    Files.createDirectories(dir.toPath)
     val topicPartition = Log.parseTopicPartitionName(dir)
-    val producerStateManager = new ProducerStateManager(topicPartition, dir, 
maxProducerIdExpirationMs)
-    new Log(dir, config, logStartOffset, recoveryPoint, scheduler, 
brokerTopicStats, time, maxProducerIdExpirationMs,
-      producerIdExpirationCheckIntervalMs, topicPartition, 
producerStateManager, logDirFailureChannel, lastShutdownClean, topicId, 
keepPartitionMetadataFile)
+    val logLoader = new LogLoader(dir, topicPartition, config, scheduler, 
time, logDirFailureChannel)

Review comment:
       Do we need LogLoader to be a class? Another option is to have LogLoader 
as an Object with a single public method `load()`.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2586,11 +1999,15 @@ object Log {
             logDirFailureChannel: LogDirFailureChannel,
             lastShutdownClean: Boolean = true,
             topicId: Option[Uuid],
-            keepPartitionMetadataFile: Boolean): Log = {
+            keepPartitionMetadataFile: Boolean = true): Log = {

Review comment:
       Is the default value needed? If so, it seems many tests are not taking 
advantage of this.

##########
File path: core/src/test/scala/unit/kafka/log/LogTestUtils.scala
##########
@@ -37,4 +47,246 @@ object LogTestUtils {
 
     new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, 
time)
   }
+
+  def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
+                      segmentBytes: Int = Defaults.SegmentSize,
+                      retentionMs: Long = Defaults.RetentionMs,
+                      retentionBytes: Long = Defaults.RetentionSize,
+                      segmentJitterMs: Long = Defaults.SegmentJitterMs,
+                      cleanupPolicy: String = Defaults.CleanupPolicy,
+                      maxMessageBytes: Int = Defaults.MaxMessageSize,
+                      indexIntervalBytes: Int = Defaults.IndexInterval,
+                      segmentIndexBytes: Int = Defaults.MaxIndexSize,
+                      messageFormatVersion: String = 
Defaults.MessageFormatVersion,
+                      fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): 
LogConfig = {
+    val logProps = new Properties()
+
+    logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long)
+    logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer)
+    logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long)
+    logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long)
+    logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs: 
java.lang.Long)
+    logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+    logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer)
+    logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: 
java.lang.Long)
+    LogConfig(logProps)
+  }
+
+  def createLog(dir: File,
+                config: LogConfig,
+                brokerTopicStats: BrokerTopicStats,
+                scheduler: Scheduler,
+                time: Time,
+                logStartOffset: Long = 0L,
+                recoveryPoint: Long = 0L,
+                maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
+                producerIdExpirationCheckIntervalMs: Int = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+                lastShutdownClean: Boolean = true,
+                topicId: Option[Uuid] = None): Log = {
+    Log(dir = dir,
+      config = config,
+      logStartOffset = logStartOffset,
+      recoveryPoint = recoveryPoint,
+      scheduler = scheduler,
+      brokerTopicStats = brokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = maxProducerIdExpirationMs,
+      producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10),
+      lastShutdownClean = lastShutdownClean,
+      topicId = topicId)
+  }
+
+  /**
+   * Check if the given log contains any segment with records that cause 
offset overflow.
+   * @param log Log to check
+   * @return true if log contains at least one segment with offset overflow; 
false otherwise
+   */
+  def hasOffsetOverflow(log: Log): Boolean = 
firstOverflowSegment(log).isDefined
+
+  def firstOverflowSegment(log: Log): Option[LogSegment] = {
+    def hasOverflow(baseOffset: Long, batch: RecordBatch): Boolean =
+      batch.lastOffset > baseOffset + Int.MaxValue || batch.baseOffset < 
baseOffset
+
+    for (segment <- log.logSegments) {
+      val overflowBatch = segment.log.batches.asScala.find(batch => 
hasOverflow(segment.baseOffset, batch))
+      if (overflowBatch.isDefined)
+        return Some(segment)
+    }
+    None
+  }
+
+  private def rawSegment(logDir: File, baseOffset: Long): FileRecords =
+    FileRecords.open(Log.logFile(logDir, baseOffset))
+
+  /**
+   * Initialize the given log directory with a set of segments, one of which 
will have an
+   * offset which overflows the segment
+   */
+  def initializeLogDirWithOverflowedSegment(logDir: File): Unit = {
+    def writeSampleBatches(baseOffset: Long, segment: FileRecords): Long = {
+      def record(offset: Long) = {
+        val data = offset.toString.getBytes
+        new SimpleRecord(data, data)
+      }
+
+      segment.append(MemoryRecords.withRecords(baseOffset, 
CompressionType.NONE, 0,
+        record(baseOffset)))
+      segment.append(MemoryRecords.withRecords(baseOffset + 1, 
CompressionType.NONE, 0,
+        record(baseOffset + 1),
+        record(baseOffset + 2)))
+      segment.append(MemoryRecords.withRecords(baseOffset + Int.MaxValue - 1, 
CompressionType.NONE, 0,
+        record(baseOffset + Int.MaxValue - 1)))
+      // Need to create the offset files explicitly to avoid triggering 
segment recovery to truncate segment.
+      Log.offsetIndexFile(logDir, baseOffset).createNewFile()
+      Log.timeIndexFile(logDir, baseOffset).createNewFile()
+      baseOffset + Int.MaxValue
+    }
+
+    def writeNormalSegment(baseOffset: Long): Long = {
+      val segment = rawSegment(logDir, baseOffset)
+      try writeSampleBatches(baseOffset, segment)
+      finally segment.close()
+    }
+
+    def writeOverflowSegment(baseOffset: Long): Long = {
+      val segment = rawSegment(logDir, baseOffset)
+      try {
+        val nextOffset = writeSampleBatches(baseOffset, segment)
+        writeSampleBatches(nextOffset, segment)
+      } finally segment.close()
+    }
+
+    // We create three segments, the second of which contains offsets which 
overflow
+    var nextOffset = 0L
+    nextOffset = writeNormalSegment(nextOffset)
+    nextOffset = writeOverflowSegment(nextOffset)
+    writeNormalSegment(nextOffset)
+  }
+
+  def allRecords(log: Log): List[Record] = {

Review comment:
       Could this be private?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -2797,63 +2590,65 @@ object LogMetricNames {
   }
 }
 
-sealed trait SegmentDeletionReason {
-  def logReason(log: Log, toDelete: List[LogSegment]): Unit
+abstract class SegmentDeletionReason {
+  def logReason(logger: Logging, toDelete: List[LogSegment]): Unit = {}

Review comment:
       Hmm, this method is being used, but no subclass implements it. Are we 
losing logging because of this?

##########
File path: core/src/test/scala/unit/kafka/log/LogTestUtils.scala
##########
@@ -37,4 +47,246 @@ object LogTestUtils {
 
     new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, 
time)
   }
+
+  def createLogConfig(segmentMs: Long = Defaults.SegmentMs,
+                      segmentBytes: Int = Defaults.SegmentSize,
+                      retentionMs: Long = Defaults.RetentionMs,
+                      retentionBytes: Long = Defaults.RetentionSize,
+                      segmentJitterMs: Long = Defaults.SegmentJitterMs,
+                      cleanupPolicy: String = Defaults.CleanupPolicy,
+                      maxMessageBytes: Int = Defaults.MaxMessageSize,
+                      indexIntervalBytes: Int = Defaults.IndexInterval,
+                      segmentIndexBytes: Int = Defaults.MaxIndexSize,
+                      messageFormatVersion: String = 
Defaults.MessageFormatVersion,
+                      fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): 
LogConfig = {
+    val logProps = new Properties()
+
+    logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long)
+    logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer)
+    logProps.put(LogConfig.RetentionMsProp, retentionMs: java.lang.Long)
+    logProps.put(LogConfig.RetentionBytesProp, retentionBytes: java.lang.Long)
+    logProps.put(LogConfig.SegmentJitterMsProp, segmentJitterMs: 
java.lang.Long)
+    logProps.put(LogConfig.CleanupPolicyProp, cleanupPolicy)
+    logProps.put(LogConfig.MaxMessageBytesProp, maxMessageBytes: Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer)
+    logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer)
+    logProps.put(LogConfig.MessageFormatVersionProp, messageFormatVersion)
+    logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: 
java.lang.Long)
+    LogConfig(logProps)
+  }
+
+  def createLog(dir: File,
+                config: LogConfig,
+                brokerTopicStats: BrokerTopicStats,
+                scheduler: Scheduler,
+                time: Time,
+                logStartOffset: Long = 0L,
+                recoveryPoint: Long = 0L,
+                maxProducerIdExpirationMs: Int = 60 * 60 * 1000,
+                producerIdExpirationCheckIntervalMs: Int = 
LogManager.ProducerIdExpirationCheckIntervalMs,
+                lastShutdownClean: Boolean = true,
+                topicId: Option[Uuid] = None): Log = {
+    Log(dir = dir,
+      config = config,
+      logStartOffset = logStartOffset,
+      recoveryPoint = recoveryPoint,
+      scheduler = scheduler,
+      brokerTopicStats = brokerTopicStats,
+      time = time,
+      maxProducerIdExpirationMs = maxProducerIdExpirationMs,
+      producerIdExpirationCheckIntervalMs = 
producerIdExpirationCheckIntervalMs,
+      logDirFailureChannel = new LogDirFailureChannel(10),
+      lastShutdownClean = lastShutdownClean,
+      topicId = topicId)
+  }
+
+  /**
+   * Check if the given log contains any segment with records that cause 
offset overflow.
+   * @param log Log to check
+   * @return true if log contains at least one segment with offset overflow; 
false otherwise
+   */
+  def hasOffsetOverflow(log: Log): Boolean = 
firstOverflowSegment(log).isDefined
+
+  def firstOverflowSegment(log: Log): Option[LogSegment] = {
+    def hasOverflow(baseOffset: Long, batch: RecordBatch): Boolean =
+      batch.lastOffset > baseOffset + Int.MaxValue || batch.baseOffset < 
baseOffset
+
+    for (segment <- log.logSegments) {
+      val overflowBatch = segment.log.batches.asScala.find(batch => 
hasOverflow(segment.baseOffset, batch))
+      if (overflowBatch.isDefined)
+        return Some(segment)
+    }
+    None
+  }
+
+  private def rawSegment(logDir: File, baseOffset: Long): FileRecords =
+    FileRecords.open(Log.logFile(logDir, baseOffset))
+
+  /**
+   * Initialize the given log directory with a set of segments, one of which 
will have an
+   * offset which overflows the segment
+   */
+  def initializeLogDirWithOverflowedSegment(logDir: File): Unit = {
+    def writeSampleBatches(baseOffset: Long, segment: FileRecords): Long = {
+      def record(offset: Long) = {
+        val data = offset.toString.getBytes
+        new SimpleRecord(data, data)
+      }
+
+      segment.append(MemoryRecords.withRecords(baseOffset, 
CompressionType.NONE, 0,
+        record(baseOffset)))
+      segment.append(MemoryRecords.withRecords(baseOffset + 1, 
CompressionType.NONE, 0,
+        record(baseOffset + 1),
+        record(baseOffset + 2)))
+      segment.append(MemoryRecords.withRecords(baseOffset + Int.MaxValue - 1, 
CompressionType.NONE, 0,
+        record(baseOffset + Int.MaxValue - 1)))
+      // Need to create the offset files explicitly to avoid triggering 
segment recovery to truncate segment.
+      Log.offsetIndexFile(logDir, baseOffset).createNewFile()
+      Log.timeIndexFile(logDir, baseOffset).createNewFile()
+      baseOffset + Int.MaxValue
+    }
+
+    def writeNormalSegment(baseOffset: Long): Long = {
+      val segment = rawSegment(logDir, baseOffset)
+      try writeSampleBatches(baseOffset, segment)
+      finally segment.close()
+    }
+
+    def writeOverflowSegment(baseOffset: Long): Long = {
+      val segment = rawSegment(logDir, baseOffset)
+      try {
+        val nextOffset = writeSampleBatches(baseOffset, segment)
+        writeSampleBatches(nextOffset, segment)
+      } finally segment.close()
+    }
+
+    // We create three segments, the second of which contains offsets which 
overflow
+    var nextOffset = 0L
+    nextOffset = writeNormalSegment(nextOffset)
+    nextOffset = writeOverflowSegment(nextOffset)
+    writeNormalSegment(nextOffset)
+  }
+
+  def allRecords(log: Log): List[Record] = {
+    val recordsFound = ListBuffer[Record]()
+    for (logSegment <- log.logSegments) {
+      for (batch <- logSegment.log.batches.asScala) {
+        recordsFound ++= batch.iterator().asScala
+      }
+    }
+    recordsFound.toList
+  }
+
+  def verifyRecordsInLog(log: Log, expectedRecords: List[Record]): Unit = {

Review comment:
       Is this method used?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to