FrankYang0529 commented on code in PR #15773: URL: https://github.com/apache/kafka/pull/15773#discussion_r1577187165
########## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ########## @@ -1343,6 +1346,45 @@ class LogManagerTest { assertFalse(f.exists()) } } + + /** + * Test KafkaScheduler can be shutdown when file delete delay is set to 0. + */ + @Test + def testShutdownWithZeroFileDeleteDelayMs(): Unit = { + val tmpLogDir = TestUtils.tempDir() + val tmpProperties = new Properties() + tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0") + val scheduler = new KafkaScheduler(1, true, "log-manager-test") + val tmpLogManager = new LogManager(logDirs = Seq(tmpLogDir).map(_.getAbsoluteFile), + initialOfflineDirs = Array.empty[File], + configRepository = new MockConfigRepository, + initialDefaultConfig = new LogConfig(tmpProperties), + cleanerConfig = new CleanerConfig(false), + recoveryThreadsPerDataDir = 1, + flushCheckMs = 1000L, + flushRecoveryOffsetCheckpointMs = 10000L, + flushStartOffsetCheckpointMs = 10000L, + retentionCheckMs = 1000L, + maxTransactionTimeoutMs = 5 * 60 * 1000, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + scheduler = scheduler, + time = Time.SYSTEM, + brokerTopicStats = new BrokerTopicStats, + logDirFailureChannel = new LogDirFailureChannel(1), + keepPartitionMetadataFile = true, + interBrokerProtocolVersion = MetadataVersion.latestTesting, + remoteStorageSystemEnable = false, + initialTaskDelayMs = 0) + + scheduler.startup() + tmpLogManager.startup(Set.empty) + val stopLogManager: Executable = () => tmpLogManager.shutdown() + val stopScheduler: Executable = () => scheduler.shutdown() + assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager) + assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler) Review Comment: I think `deletionTask` must be executed. From `LogManager#startup` to `LogManager#startupWithConfigOverrides` is not asynchronous code, so `deletionTask` must be put into scheduler queue. During scheduler shutdown, new tasks can not join, but previous task will be executed. If we want to use `taskRunning` to check whether deletionTask is executed, we may need a new value in LogManager class to keep scheduler future object. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org