dajac commented on code in PR #12029: URL: https://github.com/apache/kafka/pull/12029#discussion_r861509139
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -1554,9 +1570,11 @@ class UnifiedLog(@volatile var logStartOffset: Long, } /** - * Completely delete the local log directory and all contents from the file system with no delay + * Completely delete the local log directory and all contents from the file system with no delay. + * + * Visible for testing. */ - private[log] def delete(): Unit = { + def delete(): Unit = { Review Comment: nit: Could we keep it package private? It does not seem to be used by other packages. ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -103,6 +103,9 @@ class LogManager(logDirs: Seq[File], def currentDefaultConfig: LogConfig = _currentDefaultConfig + // Visible for testing + def logsToBeDeleted = _logsToBeDeleted Review Comment: Is this one used? I can't find any usages. If it is not used, it may be better to not rename `logsToBeDeleted`. ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -3410,6 +3410,53 @@ class UnifiedLogTest { assertThrows(classOf[OffsetOutOfRangeException], () => log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion)) } + def testBackgroundDeletionWithIOException(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024) + val log = createLog(logDir, logConfig) + assertEquals(1, log.numberOfSegments, "The number of segments should be 1") + + // Delete the underlying directory to trigger a KafkaStorageException + val dir = log.dir + Utils.delete(dir) + dir.createNewFile() + + var kafkaStorageExceptionCaptured = false + try { + log.delete() + } catch { + case _: KafkaStorageException => + kafkaStorageExceptionCaptured = true + } + assertTrue(kafkaStorageExceptionCaptured) + assertTrue(log.logDirFailureChannel.hasOfflineLogDir(tmpDir.toString)) + } + + /** + * test renaming a log's dir without reinitialization, which is the case during topic deletion + */ + @Test + def testRenamingDirWithoutReinitialization(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024) + val log = createLog(logDir, logConfig) + assertEquals(1, log.numberOfSegments, "The number of segments should be 1") + + val newDir = TestUtils.randomPartitionLogDir(tmpDir) + assertTrue(newDir.exists()) + + log.renameDir(newDir.getName, false) Review Comment: Is it worth asserting that `leaderEpochCache` and `partitionMetadataFile` are `None` after this line? ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -286,7 +286,7 @@ class UnifiedLog(@volatile var logStartOffset: Long, */ @volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset) - @volatile var partitionMetadataFile : PartitionMetadataFile = null + @volatile var partitionMetadataFile : Option[PartitionMetadataFile] = None Review Comment: nit: Could we remove the extra space before `:`? ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -3410,6 +3410,53 @@ class UnifiedLogTest { assertThrows(classOf[OffsetOutOfRangeException], () => log.maybeIncrementLogStartOffset(26L, ClientRecordDeletion)) } + def testBackgroundDeletionWithIOException(): Unit = { + val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024) + val log = createLog(logDir, logConfig) + assertEquals(1, log.numberOfSegments, "The number of segments should be 1") + + // Delete the underlying directory to trigger a KafkaStorageException + val dir = log.dir + Utils.delete(dir) + dir.createNewFile() + + var kafkaStorageExceptionCaptured = false + try { + log.delete() + } catch { + case _: KafkaStorageException => + kafkaStorageExceptionCaptured = true + } + assertTrue(kafkaStorageExceptionCaptured) Review Comment: Could we use `assertThrown` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org