[ https://issues.apache.org/jira/browse/KAFKA-6710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Rajini Sivaram resolved KAFKA-6710. ----------------------------------- Resolution: Fixed Reviewer: Jun Rao Fix Version/s: 1.2.0 > Streams integration tests hang during shutdown > ---------------------------------------------- > > Key: KAFKA-6710 > URL: https://issues.apache.org/jira/browse/KAFKA-6710 > Project: Kafka > Issue Type: Bug > Components: core, streams > Affects Versions: 1.1.0 > Reporter: Rajini Sivaram > Assignee: Rajini Sivaram > Priority: Major > Fix For: 1.2.0 > > > Builds have been timing out a lot recently and many of the logs show streams > integration tests being run, but not completed. While running tests locally, > I saw a failure during shutdown of {{TableTableJoinIntegrationTest}}. The > test was stuck waiting for a broker to shutdown when a {{KafkaScheduler}} was > attemping to delete logs. KAFKA-6624 (Commit > #1ea07b993d75ed68f4c04282eb177bf84156e0b2) added a _Thread.sleep_ to wait for > the time to delete each log segment inside the scheduled delete task. The > failing streams test had 62 logs to delete and since MockTime doesn't get > updated during the test, it would have waited for 62 minutes to complete. > This blocks shutdown of the broker for 62 minutes. This is an issue if a > streams integration test takes more than 30 seconds when the first delayed > delete task is scheduled to be run. > Changing _Thread.sleep_ to _time.sleep_ fixes this test issue. But it will be > good to know why we have a _sleep_ on a _Scheduler_ at all. With the default > _log.segment.delete.delay.ms_ of one minute, this potentially blocks a > scheduler thread for upto a minute when there are logs to be deleted. > Couldn't we just break out of the loop if it is not yet time to delete the > first log segment in the list? The log would then get deleted when the broker > checks next time. [~junrao] [~lindong] ? > > *Stack trace from failing test*: > {{"kafka-scheduler-8" daemon prio=5 tid=0x00007fe58dc16800 nid=0x9603 waiting > on condition [0x0000700003f25000]}} > {{ java.lang.Thread.State: TIMED_WAITING (sleeping)}} > {{ at java.lang.Thread.sleep(Native Method)}} > {{ at > kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:717)}} > {{ at > kafka.log.LogManager$$anonfun$3.apply$mcV$sp(LogManager.scala:406)}} > {{ at > kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)}} > {{ at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)}} > {{ at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)}} > {{ at java.util.concurrent.FutureTask.run(FutureTask.java:262)}} > {{ at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)}} > {{ at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)}} > {{ at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)}} > {{ at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)}} > {{ at java.lang.Thread.run(Thread.java:745)}}{{}} > {{}}{{"Test worker" prio=5 tid=0x00007fe58db72000 nid=0x5203 waiting on > condition [0x0000700001cbd000]}} > {{ java.lang.Thread.State: TIMED_WAITING (parking)}} > {{ at sun.misc.Unsafe.park(Native Method)}} > {{ - parking to wait for <0x0000000780fb8918> (a > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}} > {{ at > java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)}} > {{ at > java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)}} > {{ at > java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)}} > {{ at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)}} > {{ at > kafka.server.KafkaServer$$anonfun$shutdown$5.apply$mcV$sp(KafkaServer.scala:569)}} > {{ at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)}} > {{ at kafka.server.KafkaServer.shutdown(KafkaServer.scala:569)}} > {{ at > org.apache.kafka.streams.integration.utils.KafkaEmbedded.stop(KafkaEmbedded.java:129)}} > {{ at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.stop(EmbeddedKafkaCluster.java:126)}} > {{ at > org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.after(EmbeddedKafkaCluster.java:158)}} > {{ at > org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)}} > {{ at org.junit.rules.RunRules.evaluate(RunRules.java:20)}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)