showuon commented on code in PR #15773:
URL: https://github.com/apache/kafka/pull/15773#discussion_r1577162256


##########
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##########
@@ -1343,6 +1346,45 @@ class LogManagerTest {
       assertFalse(f.exists())
     }
   }
+
+  /**
+   * Test KafkaScheduler can be shutdown when file delete delay is set to 0.
+   */
+  @Test
+  def testShutdownWithZeroFileDeleteDelayMs(): Unit = {
+    val tmpLogDir = TestUtils.tempDir()
+    val tmpProperties = new Properties()
+    tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0")
+    val scheduler = new KafkaScheduler(1, true, "log-manager-test")
+    val tmpLogManager = new LogManager(logDirs = 
Seq(tmpLogDir).map(_.getAbsoluteFile),
+      initialOfflineDirs = Array.empty[File],
+      configRepository = new MockConfigRepository,
+      initialDefaultConfig = new LogConfig(tmpProperties),
+      cleanerConfig = new CleanerConfig(false),
+      recoveryThreadsPerDataDir = 1,
+      flushCheckMs = 1000L,
+      flushRecoveryOffsetCheckpointMs = 10000L,
+      flushStartOffsetCheckpointMs = 10000L,
+      retentionCheckMs = 1000L,
+      maxTransactionTimeoutMs = 5 * 60 * 1000,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      scheduler = scheduler,
+      time = Time.SYSTEM,
+      brokerTopicStats = new BrokerTopicStats,
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      keepPartitionMetadataFile = true,
+      interBrokerProtocolVersion = MetadataVersion.latestTesting,
+      remoteStorageSystemEnable = false,
+      initialTaskDelayMs = 0)
+
+    scheduler.startup()
+    tmpLogManager.startup(Set.empty)
+    val stopLogManager: Executable = () => tmpLogManager.shutdown()
+    val stopScheduler: Executable = () => scheduler.shutdown()
+    assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager)
+    assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler)

Review Comment:
   How could we make sure the `kafka-delete-logs` task is already running 
before we run shutdown? Even though we schedule it with 0 delay, it is still 
not guaranteed the task (thread) will be created immediately, right? I saw 
there's `scheduler#taskRunning` that can be used to verify that. Could we add 
that? Ex:
   
   ```
   scheduler.startup()
   tmpLogManager.startup(Set.empty)
   TestUtils.waitUntilTrue( () => scheduler.taskRunning(deletionTask), ...)
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to