This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e2554333744 KAFKA-18162 Move LocalLogTest to storage module (#18057)
e2554333744 is described below

commit e255433374454e07eb1b99632f73093b6033abc7
Author: Mickael Maison <[email protected]>
AuthorDate: Sat Dec 7 03:19:56 2024 +0100

    KAFKA-18162 Move LocalLogTest to storage module (#18057)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../test/java/org/apache/kafka/test/TestUtils.java |  17 +
 .../test/scala/unit/kafka/log/LocalLogTest.scala   | 714 -------------------
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   9 +-
 .../kafka/storage/internals/log/LocalLogTest.java  | 756 +++++++++++++++++++++
 4 files changed, 774 insertions(+), 722 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java 
b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index f1c529394be..c2ee9856c6f 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -279,6 +279,23 @@ public class TestUtils {
         return file;
     }
 
+    /**
+     * Create a random log directory in the format <string>-<int> used for 
Kafka partition logs.
+     * It is the responsibility of the caller to set up a shutdown hook for 
deletion of the directory.
+     */
+    public static File randomPartitionLogDir(File parentDir) {
+        int attempts = 1000;
+        while (attempts > 0) {
+            File f = new File(parentDir, "kafka-" + RANDOM.nextInt(1000000));
+            if (f.mkdir()) {
+                f.deleteOnExit();
+                return f;
+            }
+            attempts--;
+        }
+        throw new RuntimeException("Failed to create directory after 1000 
attempts");
+    }
+
     public static Properties producerConfig(final String bootstrapServers,
                                             final Class<?> keySerializer,
                                             final Class<?> valueSerializer,
diff --git a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala 
b/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
deleted file mode 100644
index 0b840ddde62..00000000000
--- a/core/src/test/scala/unit/kafka/log/LocalLogTest.scala
+++ /dev/null
@@ -1,714 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-import java.io.File
-import java.nio.channels.ClosedChannelException
-import java.nio.charset.StandardCharsets
-import java.util
-import java.util.regex.Pattern
-import java.util.Collections
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.apache.kafka.common.compress.Compression
-import org.apache.kafka.common.{KafkaException, TopicPartition}
-import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.record.{MemoryRecords, Record, SimpleRecord}
-import org.apache.kafka.common.utils.{Time, Utils}
-import org.apache.kafka.server.util.{MockTime, Scheduler}
-import org.apache.kafka.storage.internals.log.{FetchDataInfo, LocalLog, 
LogConfig, LogDirFailureChannel, LogFileUtils, LogOffsetMetadata, LogSegment, 
LogSegments, LogTruncation, SegmentDeletionReason}
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.function.Executable
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
-import org.mockito.Mockito.{doReturn, spy}
-
-import scala.jdk.CollectionConverters._
-
-class LocalLogTest {
-
-  var config: KafkaConfig = _
-  val tmpDir: File = TestUtils.tempDir()
-  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
-  val topicPartition = new TopicPartition("test_topic", 1)
-  val logDirFailureChannel = new LogDirFailureChannel(10)
-  val mockTime = new MockTime()
-  val log: LocalLog = createLocalLogWithActiveSegment(config = 
LogTestUtils.createLogConfig())
-
-  @BeforeEach
-  def setUp(): Unit = {
-    val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
-    config = KafkaConfig.fromProps(props)
-  }
-
-  @AfterEach
-  def tearDown(): Unit = {
-    try {
-      log.close()
-    } catch {
-      case _: KafkaStorageException => // ignore
-    }
-    Utils.delete(tmpDir)
-  }
-
-  case class KeyValue(key: String, value: String) {
-    def toRecord(timestamp: => Long = mockTime.milliseconds): SimpleRecord = {
-      new SimpleRecord(timestamp, key.getBytes, value.getBytes)
-    }
-  }
-
-  object KeyValue {
-    def fromRecord(record: Record): KeyValue = {
-      val key =
-        if (record.hasKey)
-          StandardCharsets.UTF_8.decode(record.key()).toString
-        else
-          ""
-      val value =
-        if (record.hasValue)
-          StandardCharsets.UTF_8.decode(record.value()).toString
-        else
-          ""
-      KeyValue(key, value)
-    }
-  }
-
-  private def kvsToRecords(keyValues: Iterable[KeyValue]): 
Iterable[SimpleRecord] = {
-    keyValues.map(kv => kv.toRecord())
-  }
-
-  private def recordsToKvs(records: Iterable[Record]): Iterable[KeyValue] = {
-    records.map(r => KeyValue.fromRecord(r))
-  }
-
-  private def appendRecords(records: Iterable[SimpleRecord],
-                            log: LocalLog = log,
-                            initialOffset: Long = 0L): Unit = {
-    log.append(initialOffset + records.size - 1,
-      records.head.timestamp,
-      initialOffset,
-      MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, 
records.toList : _*))
-  }
-
-  private def readRecords(log: LocalLog = log,
-                          startOffset: Long = 0L,
-                          maxLength: => Int = log.segments.activeSegment.size,
-                          minOneMessage: Boolean = false,
-                          maxOffsetMetadata: => LogOffsetMetadata = 
log.logEndOffsetMetadata,
-                          includeAbortedTxns: Boolean = false): FetchDataInfo 
= {
-    log.read(startOffset,
-             maxLength,
-             minOneMessage,
-             maxOffsetMetadata,
-             includeAbortedTxns)
-  }
-
-  @Test
-  def testLogDeleteSegmentsSuccess(): Unit = {
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    appendRecords(List(record))
-    log.roll(0L)
-    assertEquals(2, log.segments.numberOfSegments)
-    assertFalse(logDir.listFiles.isEmpty)
-    val segmentsBeforeDelete = new util.ArrayList(log.segments.values)
-    val deletedSegments = log.deleteAllSegments()
-    assertTrue(log.segments.isEmpty)
-    assertEquals(segmentsBeforeDelete, deletedSegments)
-    assertThrows(classOf[KafkaStorageException], () => 
log.checkIfMemoryMappedBufferClosed())
-    assertTrue(logDir.exists)
-  }
-
-  @Test
-  def testRollEmptyActiveSegment(): Unit = {
-    val oldActiveSegment = log.segments.activeSegment
-    log.roll(0L)
-    assertEquals(1, log.segments.numberOfSegments)
-    assertNotEquals(oldActiveSegment, log.segments.activeSegment)
-    assertFalse(logDir.listFiles.isEmpty)
-    assertTrue(oldActiveSegment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
-  }
-
-  @Test
-  def testLogDeleteDirSuccessWhenEmptyAndFailureWhenNonEmpty(): Unit ={
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    appendRecords(List(record))
-    log.roll(0L)
-    assertEquals(2, log.segments.numberOfSegments)
-    assertFalse(logDir.listFiles.isEmpty)
-
-    assertThrows(classOf[IllegalStateException], () => log.deleteEmptyDir())
-    assertTrue(logDir.exists)
-
-    log.deleteAllSegments()
-    log.deleteEmptyDir()
-    assertFalse(logDir.exists)
-  }
-
-  @Test
-  def testUpdateConfig(): Unit = {
-    val oldConfig = log.config
-    assertEquals(oldConfig, log.config)
-
-    val newConfig = LogTestUtils.createLogConfig(segmentBytes = 
oldConfig.segmentSize + 1)
-    log.updateConfig(newConfig)
-    assertEquals(newConfig, log.config)
-  }
-
-  @Test
-  def testLogDirRenameToNewDir(): Unit = {
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    appendRecords(List(record))
-    log.roll(0L)
-    assertEquals(2, log.segments.numberOfSegments)
-    val newLogDir = TestUtils.randomPartitionLogDir(tmpDir)
-    assertTrue(log.renameDir(newLogDir.getName))
-    assertFalse(logDir.exists())
-    assertTrue(newLogDir.exists())
-    assertEquals(newLogDir, log.dir)
-    assertEquals(newLogDir.getParent, log.parentDir)
-    assertEquals(newLogDir.getParent, log.dir.getParent)
-    log.segments.values.forEach(segment => assertEquals(newLogDir.getPath, 
segment.log.file().getParentFile.getPath))
-    assertEquals(2, log.segments.numberOfSegments)
-  }
-
-  @Test
-  def testLogDirRenameToExistingDir(): Unit = {
-    assertFalse(log.renameDir(log.dir.getName))
-  }
-
-  @Test
-  def testLogFlush(): Unit = {
-    assertEquals(0L, log.recoveryPoint)
-    assertEquals(mockTime.milliseconds, log.lastFlushTime)
-
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    appendRecords(List(record))
-    mockTime.sleep(1)
-    val newSegment = log.roll(0L)
-    log.flush(newSegment.baseOffset)
-    log.markFlushed(newSegment.baseOffset)
-    assertEquals(1L, log.recoveryPoint)
-    assertEquals(mockTime.milliseconds, log.lastFlushTime)
-  }
-
-  @Test
-  def testLogAppend(): Unit = {
-    val fetchDataInfoBeforeAppend = readRecords(maxLength = 1)
-    assertTrue(fetchDataInfoBeforeAppend.records.records.asScala.isEmpty)
-
-    mockTime.sleep(1)
-    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
-    appendRecords(kvsToRecords(keyValues))
-    assertEquals(2L, log.logEndOffset)
-    assertEquals(0L, log.recoveryPoint)
-    val fetchDataInfo = readRecords()
-    assertEquals(2L, fetchDataInfo.records.records.asScala.size)
-    assertEquals(keyValues, 
recordsToKvs(fetchDataInfo.records.records.asScala))
-  }
-
-  @Test
-  def testLogCloseSuccess(): Unit = {
-    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
-    appendRecords(kvsToRecords(keyValues))
-    log.close()
-    assertThrows(classOf[ClosedChannelException], () => 
appendRecords(kvsToRecords(keyValues), initialOffset = 2L))
-  }
-
-  @Test
-  def testLogCloseIdempotent(): Unit = {
-    log.close()
-    // Check that LocalLog.close() is idempotent
-    log.close()
-  }
-
-  @Test
-  def testLogCloseFailureWhenInMemoryBufferClosed(): Unit = {
-    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
-    appendRecords(kvsToRecords(keyValues))
-    log.closeHandlers()
-    assertThrows(classOf[KafkaStorageException], () => log.close())
-  }
-
-  @Test
-  def testLogCloseHandlers(): Unit = {
-    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
-    appendRecords(kvsToRecords(keyValues))
-    log.closeHandlers()
-    assertThrows(classOf[ClosedChannelException],
-                 () => appendRecords(kvsToRecords(keyValues), initialOffset = 
2L))
-  }
-
-  @Test
-  def testLogCloseHandlersIdempotent(): Unit = {
-    log.closeHandlers()
-    // Check that LocalLog.closeHandlers() is idempotent
-    log.closeHandlers()
-  }
-
-  private def testRemoveAndDeleteSegments(asyncDelete: Boolean): Unit = {
-    for (offset <- 0 to 8) {
-      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-      appendRecords(List(record), initialOffset = offset)
-      log.roll(0L)
-    }
-
-    assertEquals(10L, log.segments.numberOfSegments)
-
-    class TestDeletionReason extends SegmentDeletionReason {
-      private var _deletedSegments: util.Collection[LogSegment] = new 
util.ArrayList()
-
-      override def logReason(toDelete: util.List[LogSegment]): Unit = {
-        _deletedSegments = new util.ArrayList(toDelete)
-      }
-
-      def deletedSegments: util.Collection[LogSegment] = _deletedSegments
-    }
-    val reason = new TestDeletionReason()
-    val toDelete = new util.ArrayList(log.segments.values)
-    log.removeAndDeleteSegments(toDelete, asyncDelete, reason)
-    if (asyncDelete) {
-      mockTime.sleep(log.config.fileDeleteDelayMs + 1)
-    }
-    assertTrue(log.segments.isEmpty)
-    assertEquals(toDelete, reason.deletedSegments)
-    toDelete.forEach(segment => assertTrue(segment.deleted()))
-  }
-
-  @Test
-  def testRemoveAndDeleteSegmentsSync(): Unit = {
-    testRemoveAndDeleteSegments(asyncDelete = false)
-  }
-
-  @Test
-  def testRemoveAndDeleteSegmentsAsync(): Unit = {
-    testRemoveAndDeleteSegments(asyncDelete = true)
-  }
-
-  private def testDeleteSegmentFiles(asyncDelete: Boolean): Unit = {
-    for (offset <- 0 to 8) {
-      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-      appendRecords(List(record), initialOffset = offset)
-      log.roll(0L)
-    }
-
-    assertEquals(10L, log.segments.numberOfSegments)
-
-    val toDelete = log.segments.values
-    LocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir, 
log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
-    if (asyncDelete) {
-      toDelete.forEach {
-        segment =>
-          assertFalse(segment.deleted())
-          assertTrue(segment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
-      }
-      mockTime.sleep(log.config.fileDeleteDelayMs + 1)
-    }
-    toDelete.forEach(segment => assertTrue(segment.deleted()))
-  }
-
-  @Test
-  def testDeleteSegmentFilesSync(): Unit = {
-    testDeleteSegmentFiles(asyncDelete = false)
-  }
-
-  @Test
-  def testDeleteSegmentFilesAsync(): Unit = {
-    testDeleteSegmentFiles(asyncDelete = true)
-  }
-
-  @Test
-  def testCreateAndDeleteSegment(): Unit = {
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    appendRecords(List(record))
-    val newOffset = log.segments.activeSegment.baseOffset + 1
-    val oldActiveSegment = log.segments.activeSegment
-    val newActiveSegment = log.createAndDeleteSegment(newOffset, 
log.segments.activeSegment, true, new LogTruncation(log.logger))
-    assertEquals(1, log.segments.numberOfSegments)
-    assertEquals(newActiveSegment, log.segments.activeSegment)
-    assertNotEquals(oldActiveSegment, log.segments.activeSegment)
-    assertTrue(oldActiveSegment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX))
-    assertEquals(newOffset, log.segments.activeSegment.baseOffset)
-    assertEquals(0L, log.recoveryPoint)
-    assertEquals(newOffset, log.logEndOffset)
-    val fetchDataInfo = readRecords(startOffset = newOffset)
-    assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
-  }
-
-  @Test
-  def testTruncateFullyAndStartAt(): Unit = {
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    for (offset <- 0 to 7) {
-      appendRecords(List(record), initialOffset = offset)
-      if (offset % 2 != 0)
-        log.roll(0L)
-    }
-    for (offset <- 8 to 12) {
-      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-      appendRecords(List(record), initialOffset = offset)
-    }
-    assertEquals(5, log.segments.numberOfSegments)
-    assertNotEquals(10L, log.segments.activeSegment.baseOffset)
-    val expected = new util.ArrayList(log.segments.values)
-    val deleted = log.truncateFullyAndStartAt(10L)
-    assertEquals(expected, deleted)
-    assertEquals(1, log.segments.numberOfSegments)
-    assertEquals(10L, log.segments.activeSegment.baseOffset)
-    assertEquals(0L, log.recoveryPoint)
-    assertEquals(10L, log.logEndOffset)
-    val fetchDataInfo = readRecords(startOffset = 10L)
-    assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
-  }
-
-  @Test
-  def testWhenFetchOffsetHigherThanMaxOffset(): Unit = {
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    for (offset <- 0 to 4) {
-      appendRecords(List(record), initialOffset = offset)
-      if (offset % 2 != 0)
-        log.roll(0L)
-    }
-    assertEquals(3, log.segments.numberOfSegments)
-
-    // case-0: valid case, `startOffset` < `maxOffsetMetadata.offset`
-    var fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new 
LogOffsetMetadata(4L, 4L, 0))
-    assertEquals(1, fetchDataInfo.records.records.asScala.size)
-    assertEquals(new LogOffsetMetadata(3, 2L, 69), 
fetchDataInfo.fetchOffsetMetadata)
-
-    // case-1: `startOffset` == `maxOffsetMetadata.offset`
-    fetchDataInfo = readRecords(startOffset = 4L, maxOffsetMetadata = new 
LogOffsetMetadata(4L, 4L, 0))
-    assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
-    assertEquals(new LogOffsetMetadata(4L, 4L, 0), 
fetchDataInfo.fetchOffsetMetadata)
-
-    // case-2: `startOffset` > `maxOffsetMetadata.offset`
-    fetchDataInfo = readRecords(startOffset = 5L, maxOffsetMetadata = new 
LogOffsetMetadata(4L, 4L, 0))
-    assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
-    assertEquals(new LogOffsetMetadata(5L, 4L, 69), 
fetchDataInfo.fetchOffsetMetadata)
-
-    // case-3: `startOffset` < `maxMessageOffset.offset` but 
`maxMessageOffset.messageOnlyOffset` is true
-    fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new 
LogOffsetMetadata(4L, -1L, -1))
-    assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
-    assertEquals(new LogOffsetMetadata(3L, 2L, 69), 
fetchDataInfo.fetchOffsetMetadata)
-
-    // case-4: `startOffset` < `maxMessageOffset.offset`, 
`maxMessageOffset.messageOnlyOffset` is false, but
-    // `maxOffsetMetadata.segmentBaseOffset` < `startOffset.segmentBaseOffset`
-    fetchDataInfo = readRecords(startOffset = 3L, maxOffsetMetadata = new 
LogOffsetMetadata(4L, 0L, 40))
-    assertTrue(fetchDataInfo.records.records.asScala.isEmpty)
-    assertEquals(new LogOffsetMetadata(3L, 2L, 69), 
fetchDataInfo.fetchOffsetMetadata)
-  }
-
-  @Test
-  def testTruncateTo(): Unit = {
-    for (offset <- 0 to 11) {
-      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-      appendRecords(List(record), initialOffset = offset)
-      if (offset % 3 == 2)
-        log.roll(0L)
-    }
-    assertEquals(5, log.segments.numberOfSegments)
-    assertEquals(12L, log.logEndOffset)
-
-    val expected = new util.ArrayList(log.segments.values(9L, log.logEndOffset 
+ 1))
-    // Truncate to an offset before the base offset of the active segment
-    val deleted = log.truncateTo(7L)
-    assertEquals(expected, deleted)
-    assertEquals(3, log.segments.numberOfSegments)
-    assertEquals(6L, log.segments.activeSegment.baseOffset)
-    assertEquals(0L, log.recoveryPoint)
-    assertEquals(7L, log.logEndOffset)
-    val fetchDataInfo = readRecords(startOffset = 6L)
-    assertEquals(1, fetchDataInfo.records.records.asScala.size)
-    assertEquals(Seq(KeyValue("", "a")), 
recordsToKvs(fetchDataInfo.records.records.asScala))
-
-    // Verify that we can still append to the active segment
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    appendRecords(List(record), initialOffset = 7L)
-    assertEquals(8L, log.logEndOffset)
-  }
-
-  @Test
-  def testNonActiveSegmentsFrom(): Unit = {
-    for (i <- 0 until 5) {
-      val keyValues = Seq(KeyValue(i.toString, i.toString))
-      appendRecords(kvsToRecords(keyValues), initialOffset = i)
-      log.roll(0L)
-    }
-
-    def nonActiveBaseOffsetsFrom(startOffset: Long): Seq[Long] = {
-      
log.segments.nonActiveLogSegmentsFrom(startOffset).asScala.map(_.baseOffset).toSeq
-    }
-
-    assertEquals(5L, log.segments.activeSegment.baseOffset)
-    assertEquals(0 until 5, nonActiveBaseOffsetsFrom(0L))
-    assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(5L))
-    assertEquals(2 until 5, nonActiveBaseOffsetsFrom(2L))
-    assertEquals(Seq.empty, nonActiveBaseOffsetsFrom(6L))
-  }
-
-  private def topicPartitionName(topic: String, partition: String): String = 
topic + "-" + partition
-
-  @Test
-  def testParseTopicPartitionName(): Unit = {
-    val topic = "test_topic"
-    val partition = "143"
-    val dir = new File(logDir, topicPartitionName(topic, partition))
-    val topicPartition = LocalLog.parseTopicPartitionName(dir)
-    assertEquals(topic, topicPartition.topic)
-    assertEquals(partition.toInt, topicPartition.partition)
-  }
-
-  /**
-   * Tests that log directories with a period in their name that have been 
marked for deletion
-   * are parsed correctly by `Log.parseTopicPartitionName` (see KAFKA-5232 for 
details).
-   */
-  @Test
-  def testParseTopicPartitionNameWithPeriodForDeletedTopic(): Unit = {
-    val topic = "foo.bar-testtopic"
-    val partition = "42"
-    val dir = new File(logDir, LocalLog.logDeleteDirName(new 
TopicPartition(topic, partition.toInt)))
-    val topicPartition = LocalLog.parseTopicPartitionName(dir)
-    assertEquals(topic, topicPartition.topic, "Unexpected topic name parsed")
-    assertEquals(partition.toInt, topicPartition.partition, "Unexpected 
partition number parsed")
-  }
-
-  @Test
-  def testParseTopicPartitionNameForEmptyName(): Unit = {
-    val dir = new File("")
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir),
-      () => "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath)
-  }
-
-  @Test
-  def testParseTopicPartitionNameForNull(): Unit = {
-    val dir: File = null
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir),
-      () => "KafkaException should have been thrown for dir: " + dir)
-  }
-
-  @Test
-  def testParseTopicPartitionNameForMissingSeparator(): Unit = {
-    val topic = "test_topic"
-    val partition = "1999"
-    val dir = new File(logDir, topic + partition)
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir),
-      () => "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath)
-    // also test the "-delete" marker case
-    val deleteMarkerDir = new File(logDir, topic + partition + "." + 
LogFileUtils.DELETE_DIR_SUFFIX)
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(deleteMarkerDir),
-      () => "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath)
-  }
-
-  @Test
-  def testParseTopicPartitionNameForMissingTopic(): Unit = {
-    val topic = ""
-    val partition = "1999"
-    val dir = new File(logDir, topicPartitionName(topic, partition))
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir),
-      () => "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath)
-
-    // also test the "-delete" marker case
-    val deleteMarkerDir = new File(logDir, LocalLog.logDeleteDirName(new 
TopicPartition(topic, partition.toInt)))
-
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(deleteMarkerDir),
-      () => "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath)
-  }
-
-  @Test
-  def testParseTopicPartitionNameForMissingPartition(): Unit = {
-    val topic = "test_topic"
-    val partition = ""
-    val dir = new File(logDir.getPath + topicPartitionName(topic, partition))
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir),
-      () => "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath)
-
-    // also test the "-delete" marker case
-    val deleteMarkerDir = new File(logDir, topicPartitionName(topic, 
partition) + "." + LogFileUtils.DELETE_DIR_SUFFIX)
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(deleteMarkerDir),
-      () => "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath)
-  }
-
-  @Test
-  def testParseTopicPartitionNameForInvalidPartition(): Unit = {
-    val topic = "test_topic"
-    val partition = "1999a"
-    val dir = new File(logDir, topicPartitionName(topic, partition))
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir),
-      () => "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath)
-
-    // also test the "-delete" marker case
-    val deleteMarkerDir = new File(logDir, topic + partition + "." + 
LogFileUtils.DELETE_DIR_SUFFIX)
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(deleteMarkerDir),
-      () => "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath)
-  }
-
-  @Test
-  def testParseTopicPartitionNameForExistingInvalidDir(): Unit = {
-    val dir1 = new File(logDir.getPath + "/non_kafka_dir")
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir1),
-      () => "KafkaException should have been thrown for dir: " + 
dir1.getCanonicalPath)
-    val dir2 = new File(logDir.getPath + "/non_kafka_dir-delete")
-    assertThrows(classOf[KafkaException], () => 
LocalLog.parseTopicPartitionName(dir2),
-      () => "KafkaException should have been thrown for dir: " + 
dir2.getCanonicalPath)
-  }
-
-  @Test
-  def testLogDeleteDirName(): Unit = {
-    val name1 = LocalLog.logDeleteDirName(new TopicPartition("foo", 3))
-    assertTrue(name1.length <= 255)
-    
assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches())
-    assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name1).matches())
-    assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name1).matches())
-    val name2 = LocalLog.logDeleteDirName(
-      new TopicPartition("n" + String.join("", Collections.nCopies(248, "o")), 
5))
-    assertEquals(255, name2.length)
-    
assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches())
-    assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name2).matches())
-    assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name2).matches())
-  }
-
-  @Test
-  def testOffsetFromFile(): Unit = {
-    val offset = 23423423L
-
-    val logFile = LogFileUtils.logFile(tmpDir, offset)
-    assertEquals(offset, LogFileUtils.offsetFromFile(logFile))
-
-    val offsetIndexFile = LogFileUtils.offsetIndexFile(tmpDir, offset)
-    assertEquals(offset, LogFileUtils.offsetFromFile(offsetIndexFile))
-
-    val timeIndexFile = LogFileUtils.timeIndexFile(tmpDir, offset)
-    assertEquals(offset, LogFileUtils.offsetFromFile(timeIndexFile))
-  }
-
-  @Test
-  def testRollSegmentThatAlreadyExists(): Unit = {
-    assertEquals(1, log.segments.numberOfSegments, "Log begins with a single 
empty segment.")
-
-    // roll active segment with the same base offset of size zero should 
recreate the segment
-    log.roll(0L)
-    assertEquals(1, log.segments.numberOfSegments, "Expect 1 segment after 
roll() empty segment with base offset.")
-
-    // should be able to append records to active segment
-    val keyValues1 = List(KeyValue("k1", "v1"))
-    appendRecords(kvsToRecords(keyValues1))
-    assertEquals(0L, log.segments.activeSegment.baseOffset)
-    // make sure we can append more records
-    val keyValues2 = List(KeyValue("k2", "v2"))
-    appendRecords(keyValues2.map(_.toRecord(mockTime.milliseconds + 10)), 
initialOffset = 1L)
-    assertEquals(2, log.logEndOffset, "Expect two records in the log")
-    val readResult = readRecords()
-    assertEquals(2L, readResult.records.records.asScala.size)
-    assertEquals(keyValues1 ++ keyValues2, 
recordsToKvs(readResult.records.records.asScala))
-
-    // roll so that active segment is empty
-    log.roll(0L)
-    assertEquals(2L, log.segments.activeSegment.baseOffset, "Expect base 
offset of active segment to be LEO")
-    assertEquals(2, log.segments.numberOfSegments, "Expect two segments.")
-    assertEquals(2L, log.logEndOffset)
-  }
-
-  @Test
-  def testNewSegmentsAfterRoll(): Unit = {
-    assertEquals(1, log.segments.numberOfSegments, "Log begins with a single 
empty segment.")
-
-    // roll active segment with the same base offset of size zero should 
recreate the segment
-    {
-      val newSegment = log.roll(0L)
-      assertEquals(0L, newSegment.baseOffset)
-      assertEquals(1, log.segments.numberOfSegments)
-      assertEquals(0L, log.logEndOffset)
-    }
-
-    appendRecords(List(KeyValue("k1", "v1").toRecord()))
-
-    {
-      val newSegment = log.roll(0L)
-      assertEquals(1L, newSegment.baseOffset)
-      assertEquals(2, log.segments.numberOfSegments)
-      assertEquals(1L, log.logEndOffset)
-    }
-
-    appendRecords(List(KeyValue("k2", "v2").toRecord()), initialOffset = 1L)
-
-    {
-      val newSegment = log.roll(1L)
-      assertEquals(2L, newSegment.baseOffset)
-      assertEquals(3, log.segments.numberOfSegments)
-      assertEquals(2L, log.logEndOffset)
-    }
-  }
-
-  @Test
-  def testRollSegmentErrorWhenNextOffsetIsIllegal(): Unit = {
-    assertEquals(1, log.segments.numberOfSegments, "Log begins with a single 
empty segment.")
-
-    val keyValues = List(KeyValue("k1", "v1"), KeyValue("k2", "v2"), 
KeyValue("k3", "v3"))
-    appendRecords(kvsToRecords(keyValues))
-    assertEquals(0L, log.segments.activeSegment.baseOffset)
-    assertEquals(3, log.logEndOffset, "Expect two records in the log")
-
-    // roll to create an empty active segment
-    log.roll(0L)
-    assertEquals(3L, log.segments.activeSegment.baseOffset)
-
-    // intentionally setup the logEndOffset to introduce an error later
-    log.updateLogEndOffset(1L)
-
-    // expect an error because of attempt to roll to a new offset (1L) that's 
lower than the
-    // base offset (3L) of the active segment
-    assertThrows(classOf[KafkaException], () => log.roll(0L))
-  }
-
-  @Test
-  def testFlushingNonExistentDir(): Unit = {
-    val spyLog = spy(log)
-
-    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
-    appendRecords(List(record))
-    mockTime.sleep(1)
-    val newSegment = log.roll(0L)
-
-    // simulate the directory is renamed concurrently
-    doReturn(new File("__NON_EXISTENT__"), Nil: _*).when(spyLog).dir
-    assertDoesNotThrow((() => spyLog.flush(newSegment.baseOffset)): Executable)
-  }
-
-  private def createLocalLogWithActiveSegment(dir: File = logDir,
-                                              config: LogConfig,
-                                              segments: LogSegments = new 
LogSegments(topicPartition),
-                                              recoveryPoint: Long = 0L,
-                                              nextOffsetMetadata: 
LogOffsetMetadata = new LogOffsetMetadata(0L, 0L, 0),
-                                              scheduler: Scheduler = 
mockTime.scheduler,
-                                              time: Time = mockTime,
-                                              topicPartition: TopicPartition = 
topicPartition,
-                                              logDirFailureChannel: 
LogDirFailureChannel = logDirFailureChannel): LocalLog = {
-    segments.add(LogSegment.open(dir,
-                                 0L,
-                                 config,
-                                 time,
-                                 config.initFileSize,
-                                 config.preallocate))
-    new LocalLog(dir,
-                 config,
-                 segments,
-                 recoveryPoint,
-                 nextOffsetMetadata,
-                 scheduler,
-                 time,
-                 topicPartition,
-                 logDirFailureChannel)
-  }
-}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index bb25bc9379c..c81fa2b189d 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -135,14 +135,7 @@ object TestUtils extends Logging {
    * Create a random log directory in the format <string>-<int> used for Kafka 
partition logs.
    * It is the responsibility of the caller to set up a shutdown hook for 
deletion of the directory.
    */
-  def randomPartitionLogDir(parentDir: File): File = {
-    val attempts = 1000
-    val f = Iterator.continually(new File(parentDir, "kafka-" + 
random.nextInt(1000000)))
-                                  .take(attempts).find(_.mkdir())
-                                  .getOrElse(sys.error(s"Failed to create 
directory after $attempts attempts"))
-    f.deleteOnExit()
-    f
-  }
+  def randomPartitionLogDir(parentDir: File): File = 
JTestUtils.randomPartitionLogDir(parentDir)
 
   /**
    * Create a temporary file
diff --git 
a/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
new file mode 100644
index 00000000000..00b53de34ef
--- /dev/null
+++ 
b/storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java
@@ -0,0 +1,756 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+class LocalLogTest {
+
+    private static final MockTime MOCK_TIME = new MockTime();
+
+    private final File tmpDir = TestUtils.tempDirectory();
+    private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+    private final TopicPartition topicPartition = new 
TopicPartition("test_topic", 1);
+    private final LogDirFailureChannel logDirFailureChannel = new 
LogDirFailureChannel(10);
+
+    private LocalLog log;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        log = createLocalLogWithActiveSegment(logDir, new LogConfig(new 
Properties()));
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        try {
+            log.close();
+        } catch (KafkaStorageException kse) {
+            // ignore
+        }
+    }
+
+    record KeyValue(String key, String value) {
+
+        SimpleRecord toRecord(long timestamp) {
+            return new SimpleRecord(timestamp, key.getBytes(), 
value.getBytes());
+        }
+
+        SimpleRecord toRecord() {
+            return new SimpleRecord(MOCK_TIME.milliseconds(), key.getBytes(), 
value.getBytes());
+        }
+
+        static KeyValue fromRecord(Record record) {
+            String key = record.hasKey()
+                ? StandardCharsets.UTF_8.decode(record.key()).toString()
+                : "";
+            String value = record.hasValue()
+                ? StandardCharsets.UTF_8.decode(record.value()).toString()
+                : "";
+            return new KeyValue(key, value);
+        }
+    }
+
+    private List<SimpleRecord> kvsToRecords(List<KeyValue> keyValues) {
+        return 
keyValues.stream().map(KeyValue::toRecord).collect(Collectors.toList());
+    }
+
+    private List<KeyValue> recordsToKvs(Iterable<Record> records) {
+        List<KeyValue> keyValues = new ArrayList<>();
+        for (Record record : records) {
+            keyValues.add(KeyValue.fromRecord(record));
+        }
+        return keyValues;
+    }
+
+    private void appendRecords(List<SimpleRecord> records, long initialOffset) 
throws IOException {
+        log.append(initialOffset + records.size() - 1,
+                records.get(0).timestamp(),
+                initialOffset,
+                MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, 
records.toArray(new SimpleRecord[0])));
+    }
+
+    private FetchDataInfo readRecords(long startOffset) throws IOException {
+        return readRecords(
+                startOffset,
+                log.segments().activeSegment().size(),
+                log.logEndOffsetMetadata()
+        );
+    }
+
+    private FetchDataInfo readRecords(int maxLength) throws IOException {
+        return readRecords(
+                0L,
+                maxLength,
+                log.logEndOffsetMetadata()
+        );
+    }
+
+    private FetchDataInfo readRecords(long startOffset, LogOffsetMetadata 
maxOffsetMetadata) throws IOException {
+        return readRecords(
+                startOffset,
+                log.segments().activeSegment().size(),
+                maxOffsetMetadata
+        );
+    }
+
+    private FetchDataInfo readRecords(
+                            long startOffset,
+                            int maxLength,
+                            LogOffsetMetadata maxOffsetMetadata) throws 
IOException {
+        return log.read(startOffset,
+                maxLength,
+                false,
+                maxOffsetMetadata,
+                false);
+    }
+
+    @Test
+    public void testLogDeleteSegmentsSuccess() throws IOException {
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        appendRecords(List.of(record), 0L);
+        log.roll(0L);
+        assertEquals(2, log.segments().numberOfSegments());
+        assertNotEquals(0, logDir.listFiles().length);
+        List<LogSegment> segmentsBeforeDelete = new 
ArrayList<>(log.segments().values());
+        List<LogSegment> deletedSegments = log.deleteAllSegments();
+        assertTrue(log.segments().isEmpty());
+        assertEquals(segmentsBeforeDelete, deletedSegments);
+        assertThrows(KafkaStorageException.class, () -> 
log.checkIfMemoryMappedBufferClosed());
+        assertTrue(logDir.exists());
+    }
+
+    @Test
+    public void testRollEmptyActiveSegment() {
+        LogSegment oldActiveSegment = log.segments().activeSegment();
+        log.roll(0L);
+        assertEquals(1, log.segments().numberOfSegments());
+        assertNotEquals(oldActiveSegment, log.segments().activeSegment());
+        assertNotEquals(0, logDir.listFiles().length);
+        
assertTrue(oldActiveSegment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX));
+    }
+
+    @Test
+    public void testLogDeleteDirSuccessWhenEmptyAndFailureWhenNonEmpty() 
throws IOException {
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        appendRecords(List.of(record), 0L);
+        log.roll(0L);
+        assertEquals(2, log.segments().numberOfSegments());
+        assertNotEquals(0, logDir.listFiles().length);
+
+        assertThrows(IllegalStateException.class, () -> log.deleteEmptyDir());
+        assertTrue(logDir.exists());
+
+        log.deleteAllSegments();
+        log.deleteEmptyDir();
+        assertFalse(logDir.exists());
+    }
+
+    @Test
+    public void testUpdateConfig() {
+        LogConfig oldConfig = log.config();
+        assertEquals(oldConfig, log.config());
+
+        Properties props = new Properties();
+        props.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldConfig.segmentSize + 1);
+        LogConfig newConfig = new LogConfig(props);
+        log.updateConfig(newConfig);
+        assertEquals(newConfig, log.config());
+    }
+
+    @Test
+    public void testLogDirRenameToNewDir() throws IOException {
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        appendRecords(List.of(record), 0L);
+        log.roll(0L);
+        assertEquals(2, log.segments().numberOfSegments());
+        File newLogDir = TestUtils.randomPartitionLogDir(tmpDir);
+        assertTrue(log.renameDir(newLogDir.getName()));
+        assertFalse(logDir.exists());
+        assertTrue(newLogDir.exists());
+        assertEquals(newLogDir, log.dir());
+        assertEquals(newLogDir.getParent(), log.parentDir());
+        assertEquals(newLogDir.getParent(), log.dir().getParent());
+        log.segments().values().forEach(segment -> 
assertEquals(newLogDir.getPath(), 
segment.log().file().getParentFile().getPath()));
+        assertEquals(2, log.segments().numberOfSegments());
+    }
+
+    @Test
+    public void testLogDirRenameToExistingDir() {
+        assertFalse(log.renameDir(log.dir().getName()));
+    }
+
+    @Test
+    public void testLogFlush() throws IOException {
+        assertEquals(0L, log.recoveryPoint());
+        assertEquals(MOCK_TIME.milliseconds(), log.lastFlushTime());
+
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        appendRecords(List.of(record), 0L);
+        MOCK_TIME.sleep(1);
+        LogSegment newSegment = log.roll(0L);
+        log.flush(newSegment.baseOffset());
+        log.markFlushed(newSegment.baseOffset());
+        assertEquals(1L, log.recoveryPoint());
+        assertEquals(MOCK_TIME.milliseconds(), log.lastFlushTime());
+    }
+
+    @Test
+    public void testLogAppend() throws IOException {
+        FetchDataInfo fetchDataInfoBeforeAppend = readRecords(1);
+        
assertFalse(fetchDataInfoBeforeAppend.records.records().iterator().hasNext());
+
+        MOCK_TIME.sleep(1);
+        List<KeyValue> keyValues = List.of(new KeyValue("abc", "ABC"), new 
KeyValue("de", "DE"));
+        appendRecords(kvsToRecords(keyValues), 0L);
+        assertEquals(2L, log.logEndOffset());
+        assertEquals(0L, log.recoveryPoint());
+        FetchDataInfo fetchDataInfo = readRecords(0L);
+        assertEquals(2L, Utils.toList(fetchDataInfo.records.records()).size());
+        assertEquals(keyValues, recordsToKvs(fetchDataInfo.records.records()));
+    }
+
+    @Test
+    public void testLogCloseSuccess() throws IOException {
+        List<KeyValue> keyValues = List.of(new KeyValue("abc", "ABC"), new 
KeyValue("de", "DE"));
+        appendRecords(kvsToRecords(keyValues), 0L);
+        log.close();
+        assertThrows(ClosedChannelException.class, () -> 
appendRecords(kvsToRecords(keyValues), 2L));
+    }
+
+    @Test
+    public void testLogCloseIdempotent() {
+        log.close();
+        // Check that LocalLog.close() is idempotent
+        log.close();
+    }
+
+    @Test
+    public void testLogCloseFailureWhenInMemoryBufferClosed() throws 
IOException {
+        List<KeyValue> keyValues = List.of(new KeyValue("abc", "ABC"), new 
KeyValue("de", "DE"));
+        appendRecords(kvsToRecords(keyValues), 0L);
+        log.closeHandlers();
+        assertThrows(KafkaStorageException.class, () -> log.close());
+    }
+
+    @Test
+    public void testLogCloseHandlers() throws IOException {
+        List<KeyValue> keyValues = List.of(new KeyValue("abc", "ABC"), new 
KeyValue("de", "DE"));
+        appendRecords(kvsToRecords(keyValues), 0L);
+        log.closeHandlers();
+        assertThrows(ClosedChannelException.class, () -> 
appendRecords(kvsToRecords(keyValues), 2L));
+    }
+
+    @Test
+    public void testLogCloseHandlersIdempotent() {
+        log.closeHandlers();
+        // Check that LocalLog.closeHandlers() is idempotent
+        log.closeHandlers();
+    }
+
+    static class TestDeletionReason implements SegmentDeletionReason {
+        private Collection<LogSegment> deletedSegments = new ArrayList<>();
+
+        @Override
+        public void logReason(List<LogSegment> toDelete) {
+            deletedSegments = new ArrayList<>(toDelete);
+        }
+
+        Collection<LogSegment> deletedSegments() {
+            return deletedSegments;
+        }
+    }
+
+    private void testRemoveAndDeleteSegments(boolean asyncDelete) throws 
IOException {
+        for (int offset = 0; offset < 9; offset++) {
+            SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+            appendRecords(List.of(record), offset);
+            log.roll(0L);
+        }
+
+        assertEquals(10L, log.segments().numberOfSegments());
+
+
+        TestDeletionReason reason = new TestDeletionReason();
+        List<LogSegment> toDelete = new ArrayList<>(log.segments().values());
+        log.removeAndDeleteSegments(toDelete, asyncDelete, reason);
+        if (asyncDelete) {
+            MOCK_TIME.sleep(log.config().fileDeleteDelayMs + 1);
+        }
+        assertTrue(log.segments().isEmpty());
+        assertEquals(toDelete, reason.deletedSegments());
+        toDelete.forEach(segment -> assertTrue(segment.deleted()));
+    }
+
+    @Test
+    public void testRemoveAndDeleteSegmentsSync() throws IOException {
+        testRemoveAndDeleteSegments(false);
+    }
+
+    @Test
+    public void testRemoveAndDeleteSegmentsAsync() throws IOException {
+        testRemoveAndDeleteSegments(true);
+    }
+
+    private void testDeleteSegmentFiles(boolean asyncDelete) throws 
IOException {
+        for (int offset = 0; offset < 9; offset++) {
+            SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+            appendRecords(List.of(record), offset);
+            log.roll(0L);
+        }
+
+        assertEquals(10L, log.segments().numberOfSegments());
+
+        Collection<LogSegment> toDelete = log.segments().values();
+        LocalLog.deleteSegmentFiles(toDelete, asyncDelete, log.dir(), 
log.topicPartition(), log.config(), log.scheduler(), 
log.logDirFailureChannel(), "");
+        if (asyncDelete) {
+            toDelete.forEach(segment -> {
+                assertFalse(segment.deleted());
+                
assertTrue(segment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX));
+            });
+            MOCK_TIME.sleep(log.config().fileDeleteDelayMs + 1);
+        }
+        toDelete.forEach(segment -> assertTrue(segment.deleted()));
+    }
+
+    @Test
+    public void testDeleteSegmentFilesSync() throws IOException {
+        testDeleteSegmentFiles(false);
+    }
+
+    @Test
+    public void testDeleteSegmentFilesAsync() throws IOException {
+        testDeleteSegmentFiles(true);
+    }
+
+    @Test
+    public void testCreateAndDeleteSegment() throws IOException {
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        appendRecords(List.of(record), 0L);
+        long newOffset = log.segments().activeSegment().baseOffset() + 1;
+        LogSegment oldActiveSegment = log.segments().activeSegment();
+        LogSegment newActiveSegment = log.createAndDeleteSegment(newOffset, 
log.segments().activeSegment(), true, new LogTruncation(log.logger()));
+        assertEquals(1, log.segments().numberOfSegments());
+        assertEquals(newActiveSegment, log.segments().activeSegment());
+        assertNotEquals(oldActiveSegment, log.segments().activeSegment());
+        
assertTrue(oldActiveSegment.hasSuffix(LogFileUtils.DELETED_FILE_SUFFIX));
+        assertEquals(newOffset, log.segments().activeSegment().baseOffset());
+        assertEquals(0L, log.recoveryPoint());
+        assertEquals(newOffset, log.logEndOffset());
+        FetchDataInfo fetchDataInfo = readRecords(newOffset);
+        assertFalse(fetchDataInfo.records.records().iterator().hasNext());
+    }
+
+    @Test
+    public void testTruncateFullyAndStartAt() throws IOException {
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        for (int offset = 0; offset < 8; offset++) {
+            appendRecords(List.of(record), offset);
+            if (offset % 2 != 0)
+                log.roll(0L);
+        }
+        for (int offset = 8; offset < 13; offset++) {
+            SimpleRecord r = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+            appendRecords(List.of(r), offset);
+        }
+        assertEquals(5, log.segments().numberOfSegments());
+        assertNotEquals(10L, log.segments().activeSegment().baseOffset());
+        List<LogSegment> expected = new ArrayList<>(log.segments().values());
+        List<LogSegment> deleted = log.truncateFullyAndStartAt(10L);
+        assertEquals(expected, deleted);
+        assertEquals(1, log.segments().numberOfSegments());
+        assertEquals(10L, log.segments().activeSegment().baseOffset());
+        assertEquals(0L, log.recoveryPoint());
+        assertEquals(10L, log.logEndOffset());
+        FetchDataInfo fetchDataInfo = readRecords(10L);
+        assertFalse(fetchDataInfo.records.records().iterator().hasNext());
+    }
+
+    @Test
+    public void testWhenFetchOffsetHigherThanMaxOffset() throws IOException {
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        for (int offset = 0; offset < 5; offset++) {
+            appendRecords(List.of(record), offset);
+            if (offset % 2 != 0)
+                log.roll(0L);
+        }
+        assertEquals(3, log.segments().numberOfSegments());
+
+        // case-0: valid case, `startOffset` < `maxOffsetMetadata.offset`
+        var fetchDataInfo = readRecords(3L, new LogOffsetMetadata(4L, 4L, 0));
+        assertEquals(1, Utils.toList(fetchDataInfo.records.records()).size());
+        assertEquals(new LogOffsetMetadata(3, 2L, 69), 
fetchDataInfo.fetchOffsetMetadata);
+
+        // case-1: `startOffset` == `maxOffsetMetadata.offset`
+        fetchDataInfo = readRecords(4L, new LogOffsetMetadata(4L, 4L, 0));
+        assertFalse(fetchDataInfo.records.records().iterator().hasNext());
+        assertEquals(new LogOffsetMetadata(4L, 4L, 0), 
fetchDataInfo.fetchOffsetMetadata);
+
+        // case-2: `startOffset` > `maxOffsetMetadata.offset`
+        fetchDataInfo = readRecords(5L, new LogOffsetMetadata(4L, 4L, 0));
+        assertFalse(fetchDataInfo.records.records().iterator().hasNext());
+        assertEquals(new LogOffsetMetadata(5L, 4L, 69), 
fetchDataInfo.fetchOffsetMetadata);
+
+        // case-3: `startOffset` < `maxMessageOffset.offset` but 
`maxMessageOffset.messageOnlyOffset` is true
+        fetchDataInfo = readRecords(3L, new LogOffsetMetadata(4L, -1L, -1));
+        assertFalse(fetchDataInfo.records.records().iterator().hasNext());
+        assertEquals(new LogOffsetMetadata(3L, 2L, 69), 
fetchDataInfo.fetchOffsetMetadata);
+
+        // case-4: `startOffset` < `maxMessageOffset.offset`, 
`maxMessageOffset.messageOnlyOffset` is false, but
+        // `maxOffsetMetadata.segmentBaseOffset` < 
`startOffset.segmentBaseOffset`
+        fetchDataInfo = readRecords(3L, new LogOffsetMetadata(4L, 0L, 40));
+        assertFalse(fetchDataInfo.records.records().iterator().hasNext());
+        assertEquals(new LogOffsetMetadata(3L, 2L, 69), 
fetchDataInfo.fetchOffsetMetadata);
+    }
+
+    @Test
+    public void testTruncateTo() throws IOException {
+        for (int offset = 0; offset < 12; offset++) {
+            SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+            appendRecords(List.of(record), offset);
+            if (offset % 3 == 2)
+                log.roll(0L);
+        }
+        assertEquals(5, log.segments().numberOfSegments());
+        assertEquals(12L, log.logEndOffset());
+
+        List<LogSegment> expected = new ArrayList<>(log.segments().values(9L, 
log.logEndOffset() + 1));
+        // Truncate to an offset before the base offset of the active segment
+        Collection<LogSegment> deleted = log.truncateTo(7L);
+        assertEquals(expected, deleted);
+        assertEquals(3, log.segments().numberOfSegments());
+        assertEquals(6L, log.segments().activeSegment().baseOffset());
+        assertEquals(0L, log.recoveryPoint());
+        assertEquals(7L, log.logEndOffset());
+        FetchDataInfo fetchDataInfo = readRecords(6L);
+        assertEquals(1, Utils.toList(fetchDataInfo.records.records()).size());
+        assertEquals(List.of(new KeyValue("", "a")), 
recordsToKvs(fetchDataInfo.records.records()));
+
+        // Verify that we can still append to the active segment
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        appendRecords(List.of(record), 7L);
+        assertEquals(8L, log.logEndOffset());
+    }
+
+    @Test
+    public void testNonActiveSegmentsFrom() throws IOException {
+        for (int i = 0; i < 5; i++) {
+            List<KeyValue> keyValues = List.of(new KeyValue(String.valueOf(i), 
String.valueOf(i)));
+            appendRecords(kvsToRecords(keyValues), i);
+            log.roll(0L);
+        }
+
+        assertEquals(5L, log.segments().activeSegment().baseOffset());
+        assertEquals(List.of(0L, 1L, 2L, 3L, 4L), 
nonActiveBaseOffsetsFrom(0L));
+        assertEquals(List.of(), nonActiveBaseOffsetsFrom(5L));
+        assertEquals(List.of(2L, 3L, 4L), nonActiveBaseOffsetsFrom(2L));
+        assertEquals(List.of(), nonActiveBaseOffsetsFrom(6L));
+    }
+
+    private List<Long> nonActiveBaseOffsetsFrom(long startOffset) {
+        return log.segments().nonActiveLogSegmentsFrom(startOffset).stream()
+                .map(LogSegment::baseOffset)
+                .collect(Collectors.toList());
+    }
+
+    private String topicPartitionName(String topic, String partition) {
+        return topic + "-" + partition;
+    }
+
+    @Test
+    public void testParseTopicPartitionName() throws IOException {
+        String topic = "test_topic";
+        String partition = "143";
+        File dir = new File(logDir, topicPartitionName(topic, partition));
+        TopicPartition topicPartition = LocalLog.parseTopicPartitionName(dir);
+        assertEquals(topic, topicPartition.topic());
+        assertEquals(Integer.parseInt(partition), topicPartition.partition());
+    }
+
+    /**
+     * Tests that log directories with a period in their name that have been 
marked for deletion
+     * are parsed correctly by `Log.parseTopicPartitionName` (see KAFKA-5232 
for details).
+     */
+    @Test
+    public void testParseTopicPartitionNameWithPeriodForDeletedTopic() throws 
IOException {
+        String topic = "foo.bar-testtopic";
+        String partition = "42";
+        File dir = new File(logDir, LocalLog.logDeleteDirName(new 
TopicPartition(topic, Integer.parseInt(partition))));
+        TopicPartition topicPartition = LocalLog.parseTopicPartitionName(dir);
+        assertEquals(topic, topicPartition.topic(), "Unexpected topic name 
parsed");
+        assertEquals(Integer.parseInt(partition), topicPartition.partition(), 
"Unexpected partition number parsed");
+    }
+
+    @Test
+    public void testParseTopicPartitionNameForEmptyName() throws IOException {
+        File dir = new File("");
+        String msg = "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(dir), msg);
+    }
+
+    @Test
+    public void testParseTopicPartitionNameForNull() {
+        File dir = null;
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(dir),
+                () -> "KafkaException should have been thrown for dir: " + 
dir);
+    }
+
+    @Test
+    public void testParseTopicPartitionNameForMissingSeparator() throws 
IOException {
+        String topic = "test_topic";
+        String partition = "1999";
+        File dir = new File(logDir, topic + partition);
+        String msg = "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(dir), msg);
+        // also test the "-delete" marker case
+        File deleteMarkerDir = new File(logDir, topic + partition + "." + 
LogFileUtils.DELETE_DIR_SUFFIX);
+        msg = "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(deleteMarkerDir), msg);
+    }
+
+    @Test
+    public void testParseTopicPartitionNameForMissingTopic() throws 
IOException {
+        String topic = "";
+        String partition = "1999";
+        File dir = new File(logDir, topicPartitionName(topic, partition));
+        String msg = "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(dir), msg);
+
+        // also test the "-delete" marker case
+        File deleteMarkerDir = new File(logDir, LocalLog.logDeleteDirName(new 
TopicPartition(topic, Integer.parseInt(partition))));
+
+        msg = "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(deleteMarkerDir), msg);
+    }
+
+    @Test
+    public void testParseTopicPartitionNameForMissingPartition() throws 
IOException {
+        String topic = "test_topic";
+        String partition = "";
+        File dir = new File(logDir.getPath() + topicPartitionName(topic, 
partition));
+        String msg = "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(dir), msg);
+
+        // also test the "-delete" marker case
+        File deleteMarkerDir = new File(logDir, topicPartitionName(topic, 
partition) + "." + LogFileUtils.DELETE_DIR_SUFFIX);
+        msg = "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(deleteMarkerDir), msg);
+    }
+
+    @Test
+    public void testParseTopicPartitionNameForInvalidPartition() throws 
IOException {
+        String topic = "test_topic";
+        String partition = "1999a";
+        File dir = new File(logDir, topicPartitionName(topic, partition));
+        String msg = "KafkaException should have been thrown for dir: " + 
dir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(dir), msg);
+
+        // also test the "-delete" marker case
+        File deleteMarkerDir = new File(logDir, topic + partition + "." + 
LogFileUtils.DELETE_DIR_SUFFIX);
+        msg = "KafkaException should have been thrown for dir: " + 
deleteMarkerDir.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(deleteMarkerDir), msg);
+    }
+
+    @Test
+    public void testParseTopicPartitionNameForExistingInvalidDir() throws 
IOException {
+        File dir1 = new File(logDir.getPath() + "/non_kafka_dir");
+        String msg = "KafkaException should have been thrown for dir: " + 
dir1.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(dir1), msg);
+        File dir2 = new File(logDir.getPath() + "/non_kafka_dir-delete");
+        msg = "KafkaException should have been thrown for dir: " + 
dir2.getCanonicalPath();
+        assertThrows(KafkaException.class, () -> 
LocalLog.parseTopicPartitionName(dir2), msg);
+    }
+
+    @Test
+    public void testLogDeleteDirName() {
+        String name1 = LocalLog.logDeleteDirName(new TopicPartition("foo", 3));
+        assertTrue(name1.length() <= 255);
+        
assertTrue(Pattern.compile("foo-3\\.[0-9a-z]{32}-delete").matcher(name1).matches());
+        assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name1).matches());
+        assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name1).matches());
+        String name2 = LocalLog.logDeleteDirName(
+                new TopicPartition("n" + String.join("", 
Collections.nCopies(248, "o")), 5));
+        assertEquals(255, name2.length());
+        
assertTrue(Pattern.compile("n[o]{212}-5\\.[0-9a-z]{32}-delete").matcher(name2).matches());
+        assertTrue(LocalLog.DELETE_DIR_PATTERN.matcher(name2).matches());
+        assertFalse(LocalLog.FUTURE_DIR_PATTERN.matcher(name2).matches());
+    }
+
+    @Test
+    public void testOffsetFromFile() {
+        long offset = 23423423L;
+
+        File logFile = LogFileUtils.logFile(tmpDir, offset);
+        assertEquals(offset, LogFileUtils.offsetFromFile(logFile));
+
+        File offsetIndexFile = LogFileUtils.offsetIndexFile(tmpDir, offset);
+        assertEquals(offset, LogFileUtils.offsetFromFile(offsetIndexFile));
+
+        File timeIndexFile = LogFileUtils.timeIndexFile(tmpDir, offset);
+        assertEquals(offset, LogFileUtils.offsetFromFile(timeIndexFile));
+    }
+
+    @Test
+    public void testRollSegmentThatAlreadyExists() throws IOException {
+        assertEquals(1, log.segments().numberOfSegments(), "Log begins with a 
single empty segment.");
+
+        // roll active segment with the same base offset of size zero should 
recreate the segment
+        log.roll(0L);
+        assertEquals(1, log.segments().numberOfSegments(), "Expect 1 segment 
after roll() empty segment with base offset.");
+
+        // should be able to append records to active segment
+        List<KeyValue> keyValues1 = List.of(new KeyValue("k1", "v1"));
+        appendRecords(kvsToRecords(keyValues1), 0);
+        assertEquals(0L, log.segments().activeSegment().baseOffset());
+        // make sure we can append more records
+        List<KeyValue> keyValues2 = List.of(new KeyValue("k2", "v2"));
+        appendRecords(keyValues2.stream()
+                .map(kv -> kv.toRecord(MOCK_TIME.milliseconds() + 10))
+                .collect(Collectors.toList()),
+                1L);
+        assertEquals(2, log.logEndOffset(), "Expect two records in the log");
+        FetchDataInfo readResult = readRecords(0L);
+        assertEquals(2L, Utils.toList(readResult.records.records()).size());
+        assertEquals(Stream.concat(keyValues1.stream(), 
keyValues2.stream()).collect(Collectors.toList()), 
recordsToKvs(readResult.records.records()));
+
+        // roll so that active segment is empty
+        log.roll(0L);
+        assertEquals(2L, log.segments().activeSegment().baseOffset(), "Expect 
base offset of active segment to be LEO");
+        assertEquals(2, log.segments().numberOfSegments(), "Expect two 
segments.");
+        assertEquals(2L, log.logEndOffset());
+    }
+
+    @Test
+    public void testNewSegmentsAfterRoll() throws IOException {
+        assertEquals(1, log.segments().numberOfSegments(), "Log begins with a 
single empty segment.");
+
+        // roll active segment with the same base offset of size zero should 
recreate the segment
+        {
+            LogSegment newSegment = log.roll(0L);
+            assertEquals(0L, newSegment.baseOffset());
+            assertEquals(1, log.segments().numberOfSegments());
+            assertEquals(0L, log.logEndOffset());
+        }
+
+        appendRecords(List.of(new KeyValue("k1", "v1").toRecord()), 0L);
+
+        {
+            LogSegment newSegment = log.roll(0L);
+            assertEquals(1L, newSegment.baseOffset());
+            assertEquals(2, log.segments().numberOfSegments());
+            assertEquals(1L, log.logEndOffset());
+        }
+
+        appendRecords(List.of(new KeyValue("k2", "v2").toRecord()), 1L);
+
+        {
+            LogSegment newSegment = log.roll(1L);
+            assertEquals(2L, newSegment.baseOffset());
+            assertEquals(3, log.segments().numberOfSegments());
+            assertEquals(2L, log.logEndOffset());
+        }
+    }
+
+    @Test
+    public void testRollSegmentErrorWhenNextOffsetIsIllegal() throws 
IOException {
+        assertEquals(1, log.segments().numberOfSegments(), "Log begins with a 
single empty segment.");
+
+        List<KeyValue> keyValues = List.of(new KeyValue("k1", "v1"), new 
KeyValue("k2", "v2"), new KeyValue("k3", "v3"));
+        appendRecords(kvsToRecords(keyValues), 0L);
+        assertEquals(0L, log.segments().activeSegment().baseOffset());
+        assertEquals(3, log.logEndOffset(), "Expect two records in the log");
+
+        // roll to create an empty active segment
+        log.roll(0L);
+        assertEquals(3L, log.segments().activeSegment().baseOffset());
+
+        // intentionally setup the logEndOffset to introduce an error later
+        log.updateLogEndOffset(1L);
+
+        // expect an error because of attempt to roll to a new offset (1L) 
that's lower than the
+        // base offset (3L) of the active segment
+        assertThrows(KafkaException.class, () -> log.roll(0L));
+    }
+
+    @Test
+    public void testFlushingNonExistentDir() throws IOException {
+        LocalLog spyLog = spy(log);
+
+        SimpleRecord record = new SimpleRecord(MOCK_TIME.milliseconds(), 
"a".getBytes());
+        appendRecords(List.of(record), 0L);
+        MOCK_TIME.sleep(1);
+        LogSegment newSegment = log.roll(0L);
+
+        // simulate the directory is renamed concurrently
+        doReturn(new File("__NON_EXISTENT__")).when(spyLog).dir();
+        assertDoesNotThrow(() -> spyLog.flush(newSegment.baseOffset()));
+    }
+
+    private LocalLog createLocalLogWithActiveSegment(File dir, LogConfig 
config) throws IOException {
+        LogSegments segments = new LogSegments(topicPartition);
+        segments.add(LogSegment.open(dir,
+                0L,
+                config,
+                MOCK_TIME,
+                config.initFileSize(),
+                config.preallocate));
+        return new LocalLog(dir,
+                config,
+                segments,
+                0L,
+                new LogOffsetMetadata(0L, 0L, 0),
+                MOCK_TIME.scheduler,
+                MOCK_TIME,
+                topicPartition,
+                logDirFailureChannel);
+    }
+}

Reply via email to