dajac commented on code in PR #12029:
URL: https://github.com/apache/kafka/pull/12029#discussion_r861509139


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1554,9 +1570,11 @@ class UnifiedLog(@volatile var logStartOffset: Long,
   }
 
   /**
-   * Completely delete the local log directory and all contents from the file 
system with no delay
+   * Completely delete the local log directory and all contents from the file 
system with no delay.
+   *
+   * Visible for testing.
    */
-  private[log] def delete(): Unit = {
+  def delete(): Unit = {

Review Comment:
   nit: Could we keep it package private? It does not seem to be used by other 
packages.



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -103,6 +103,9 @@ class LogManager(logDirs: Seq[File],
 
   def currentDefaultConfig: LogConfig = _currentDefaultConfig
 
+  // Visible for testing
+  def logsToBeDeleted = _logsToBeDeleted

Review Comment:
   Is this one used? I can't find any usages. If it is not used, it may be 
better to not rename `logsToBeDeleted`.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3410,6 +3410,53 @@ class UnifiedLogTest {
     assertThrows(classOf[OffsetOutOfRangeException], () => 
log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion))
   }
 
+  def testBackgroundDeletionWithIOException(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
+
+    // Delete the underlying directory to trigger a KafkaStorageException
+    val dir = log.dir
+    Utils.delete(dir)
+    dir.createNewFile()
+
+    var kafkaStorageExceptionCaptured = false
+    try {
+      log.delete()
+    } catch {
+      case _: KafkaStorageException =>
+        kafkaStorageExceptionCaptured = true
+    }
+    assertTrue(kafkaStorageExceptionCaptured)
+    assertTrue(log.logDirFailureChannel.hasOfflineLogDir(tmpDir.toString))
+  }
+
+  /**
+   * test renaming a log's dir without reinitialization, which is the case 
during topic deletion
+   */
+  @Test
+  def testRenamingDirWithoutReinitialization(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
+
+    val newDir = TestUtils.randomPartitionLogDir(tmpDir)
+    assertTrue(newDir.exists())
+
+    log.renameDir(newDir.getName, false)

Review Comment:
   Is it worth asserting that `leaderEpochCache` and `partitionMetadataFile` 
are `None` after this line?



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -286,7 +286,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    */
   @volatile private var highWatermarkMetadata: LogOffsetMetadata = 
LogOffsetMetadata(logStartOffset)
 
-  @volatile var partitionMetadataFile : PartitionMetadataFile = null
+  @volatile var partitionMetadataFile : Option[PartitionMetadataFile] = None

Review Comment:
   nit: Could we remove the extra space before `:`?



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3410,6 +3410,53 @@ class UnifiedLogTest {
     assertThrows(classOf[OffsetOutOfRangeException], () => 
log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion))
   }
 
+  def testBackgroundDeletionWithIOException(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
+    val log = createLog(logDir, logConfig)
+    assertEquals(1, log.numberOfSegments, "The number of segments should be 1")
+
+    // Delete the underlying directory to trigger a KafkaStorageException
+    val dir = log.dir
+    Utils.delete(dir)
+    dir.createNewFile()
+
+    var kafkaStorageExceptionCaptured = false
+    try {
+      log.delete()
+    } catch {
+      case _: KafkaStorageException =>
+        kafkaStorageExceptionCaptured = true
+    }
+    assertTrue(kafkaStorageExceptionCaptured)

Review Comment:
   Could we use `assertThrown` here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to