showuon commented on code in PR #15773:
URL: https://github.com/apache/kafka/pull/15773#discussion_r1577162256


##########
core/src/test/scala/unit/kafka/log/LogManagerTest.scala:
##########
@@ -1343,6 +1346,45 @@ class LogManagerTest {
       assertFalse(f.exists())
     }
   }
+
+  /**
+   * Test KafkaScheduler can be shutdown when file delete delay is set to 0.
+   */
+  @Test
+  def testShutdownWithZeroFileDeleteDelayMs(): Unit = {
+    val tmpLogDir = TestUtils.tempDir()
+    val tmpProperties = new Properties()
+    tmpProperties.put(TopicConfig.FILE_DELETE_DELAY_MS_CONFIG, "0")
+    val scheduler = new KafkaScheduler(1, true, "log-manager-test")
+    val tmpLogManager = new LogManager(logDirs = 
Seq(tmpLogDir).map(_.getAbsoluteFile),
+      initialOfflineDirs = Array.empty[File],
+      configRepository = new MockConfigRepository,
+      initialDefaultConfig = new LogConfig(tmpProperties),
+      cleanerConfig = new CleanerConfig(false),
+      recoveryThreadsPerDataDir = 1,
+      flushCheckMs = 1000L,
+      flushRecoveryOffsetCheckpointMs = 10000L,
+      flushStartOffsetCheckpointMs = 10000L,
+      retentionCheckMs = 1000L,
+      maxTransactionTimeoutMs = 5 * 60 * 1000,
+      producerStateManagerConfig = new 
ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT,
 false),
+      producerIdExpirationCheckIntervalMs = 
TransactionLogConfigs.PRODUCER_ID_EXPIRATION_CHECK_INTERVAL_MS_DEFAULT,
+      scheduler = scheduler,
+      time = Time.SYSTEM,
+      brokerTopicStats = new BrokerTopicStats,
+      logDirFailureChannel = new LogDirFailureChannel(1),
+      keepPartitionMetadataFile = true,
+      interBrokerProtocolVersion = MetadataVersion.latestTesting,
+      remoteStorageSystemEnable = false,
+      initialTaskDelayMs = 0)
+
+    scheduler.startup()
+    tmpLogManager.startup(Set.empty)
+    val stopLogManager: Executable = () => tmpLogManager.shutdown()
+    val stopScheduler: Executable = () => scheduler.shutdown()
+    assertTimeoutPreemptively(Duration.ofMillis(5000), stopLogManager)
+    assertTimeoutPreemptively(Duration.ofMillis(5000), stopScheduler)

Review Comment:
   How could we make sure the `kafka-delete-logs` task is already running 
before we run shutdown? I saw there's `scheduler#taskRunning` that can be used 
to verify that. Could we add that? Ex:
   
   ```
   scheduler.startup()
   tmpLogManager.startup(Set.empty)
   TestUtils.waitUntilTrue( () => scheduler.taskRunning(deletionTask), ...)
   ```
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to