FrankYang0529 commented on code in PR #15773:
URL: https://github.com/apache/kafka/pull/15773#discussion_r1577187165


##########
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##########
@@ -1343,6 +1346,45 @@ class LogManagerTest {
       assertFalse(f.exists())
     }
   }
+
+  /**
+   * Test KafkaScheduler can be shutdown when file delete delay is set to 0.
+   */
+  @Test
+  def testShutdownWithZeroFileDeleteDelayMs(): Unit = {
+    val tmpLogDir = TestUtils.tempDir()
+    val tmpProperties = new Properties()
+    tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0")
+    val scheduler = new KafkaScheduler(1, true, "log-manager-test")
+    val tmpLogManager = new LogManager(logDirs = 
Seq(tmpLogDir).map(_.getAbsoluteFile),
+      initialOfflineDirs = Array.empty[File],
+      configRepository = new MockConfigRepository,
+      initialDefaultConfig = new LogConfig(tmpProperties),
+      cleanerConfig = new CleanerConfig(false),
+      recoveryThreadsPerDataDir = 1,
+      flushCheckMs = 1000L,
+      flushRecoveryOffsetCheckpointMs = 10000L,
+      flushStartOffsetCheckpointMs = 10000L,
+      retentionCheckMs = 1000L,
+      maxTransactionTimeoutMs = 5 * 60 * 1000,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      scheduler = scheduler,
+      time = Time.SYSTEM,
+      brokerTopicStats = new BrokerTopicStats,
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      keepPartitionMetadataFile = true,
+      interBrokerProtocolVersion = MetadataVersion.latestTesting,
+      remoteStorageSystemEnable = false,
+      initialTaskDelayMs = 0)
+
+    scheduler.startup()
+    tmpLogManager.startup(Set.empty)
+    val stopLogManager: Executable = () => tmpLogManager.shutdown()
+    val stopScheduler: Executable = () => scheduler.shutdown()
+    assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager)
+    assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler)

Review Comment:
   I think `deletionTask` must be executed. From `LogManager#startup` to 
`LogManager#startupWithConfigOverrides` is not asynchronous code, so 
`deletionTask` must be put into scheduler queue.  During scheduler shutdown, 
new tasks can not join, but previous task will be executed.
   
   If we want to use `taskRunning` to check whether deletionTask is executed, 
we may need a new value in LogManager class to keep scheduler future object. 
WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to