showuon commented on code in PR #15773: URL: https://github.com/apache/kafka/pull/15773#discussion_r1577162256
########## core/src/test/scala/unit/kafka/log/LogManagerTest.scala: ########## @@ -1343,6 +1346,45 @@ class LogManagerTest { assertFalse(f.exists()) } } + + /** + * Test KafkaScheduler can be shutdown when file delete delay is set to 0. + */ + @Test + def testShutdownWithZeroFileDeleteDelayMs(): Unit = { + val tmpLogDir = TestUtils.tempDir() + val tmpProperties = new Properties() + tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0") + val scheduler = new KafkaScheduler(1, true, "log-manager-test") + val tmpLogManager = new LogManager(logDirs = Seq(tmpLogDir).map(_.getAbsoluteFile), + initialOfflineDirs = Array.empty[File], + configRepository = new MockConfigRepository, + initialDefaultConfig = new LogConfig(tmpProperties), + cleanerConfig = new CleanerConfig(false), + recoveryThreadsPerDataDir = 1, + flushCheckMs = 1000L, + flushRecoveryOffsetCheckpointMs = 10000L, + flushStartOffsetCheckpointMs = 10000L, + retentionCheckMs = 1000L, + maxTransactionTimeoutMs = 5 * 60 * 1000, + producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false), + producerIdExpirationCheckIntervalMs = TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT, + scheduler = scheduler, + time = Time.SYSTEM, + brokerTopicStats = new BrokerTopicStats, + logDirFailureChannel = new LogDirFailureChannel(1), + keepPartitionMetadataFile = true, + interBrokerProtocolVersion = MetadataVersion.latestTesting, + remoteStorageSystemEnable = false, + initialTaskDelayMs = 0) + + scheduler.startup() + tmpLogManager.startup(Set.empty) + val stopLogManager: Executable = () => tmpLogManager.shutdown() + val stopScheduler: Executable = () => scheduler.shutdown() + assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager) + assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler) Review Comment: How could we make sure the `kafka-delete-logs` task is already running before we run shutdown? Even though we schedule it with 0 delay, it is still not guaranteed the task (thread) will be created immediately, right? I saw there's `scheduler#taskRunning` that can be used to verify that. Could we add that? Ex: ``` scheduler.startup() tmpLogManager.startup(Set.empty) TestUtils.waitUntilTrue( () => scheduler.taskRunning(deletionTask), ...) ``` WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org