[ 
https://issues.apache.org/jira/browse/KAFKA-6710?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-6710.
-----------------------------------
       Resolution: Fixed
         Reviewer: Jun Rao
    Fix Version/s: 1.2.0

> Streams integration tests hang during shutdown
> ----------------------------------------------
>
>                 Key: KAFKA-6710
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6710
>             Project: Kafka
>          Issue Type: Bug
>          Components: core, streams
>    Affects Versions: 1.1.0
>            Reporter: Rajini Sivaram
>            Assignee: Rajini Sivaram
>            Priority: Major
>             Fix For: 1.2.0
>
>
> Builds have been timing out a lot recently and many of the logs show streams 
> integration tests being run, but not completed. While running tests locally, 
> I saw a failure during shutdown of {{TableTableJoinIntegrationTest}}. The 
> test was stuck waiting for a broker to shutdown when a {{KafkaScheduler}} was 
> attemping to delete logs. KAFKA-6624 (Commit 
> #1ea07b993d75ed68f4c04282eb177bf84156e0b2) added a _Thread.sleep_ to wait for 
> the time to delete each log segment inside the scheduled delete task. The 
> failing streams test had 62 logs to delete and since MockTime doesn't get 
> updated during the test, it would have waited for 62 minutes to complete. 
> This blocks shutdown of the broker for 62 minutes. This is an issue if a 
> streams integration test takes more than 30 seconds when the first delayed 
> delete task is scheduled to be run.
> Changing _Thread.sleep_ to _time.sleep_ fixes this test issue. But it will be 
> good to know why we have a _sleep_ on a _Scheduler_ at all. With the default 
> _log.segment.delete.delay.ms_ of one minute, this potentially blocks a 
> scheduler thread for upto a minute when there are logs to be deleted. 
> Couldn't we just break out of the loop if it is not yet time to delete the 
> first log segment in the list? The log would then get deleted when the broker 
> checks next time. [~junrao] [~lindong] ?
>  
> *Stack trace from failing test*:
> {{"kafka-scheduler-8" daemon prio=5 tid=0x00007fe58dc16800 nid=0x9603 waiting 
> on condition [0x0000700003f25000]}}
> {{   java.lang.Thread.State: TIMED_WAITING (sleeping)}}
> {{        at java.lang.Thread.sleep(Native Method)}}
> {{        at 
> kafka.log.LogManager.kafka$log$LogManager$$deleteLogs(LogManager.scala:717)}}
> {{        at 
> kafka.log.LogManager$$anonfun$3.apply$mcV$sp(LogManager.scala:406)}}
> {{        at 
> kafka.utils.KafkaScheduler$$anonfun$1.apply$mcV$sp(KafkaScheduler.scala:110)}}
> {{        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)}}
> {{        at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)}}
> {{        at java.util.concurrent.FutureTask.run(FutureTask.java:262)}}
> {{        at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)}}
> {{        at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)}}
> {{        at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)}}
> {{        at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)}}
> {{        at java.lang.Thread.run(Thread.java:745)}}{{}}
> {{}}{{"Test worker" prio=5 tid=0x00007fe58db72000 nid=0x5203 waiting on 
> condition [0x0000700001cbd000]}}
> {{   java.lang.Thread.State: TIMED_WAITING (parking)}}
> {{        at sun.misc.Unsafe.park(Native Method)}}
> {{        - parking to wait for  <0x0000000780fb8918> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)}}
> {{        at 
> java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)}}
> {{        at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)}}
> {{        at 
> java.util.concurrent.ThreadPoolExecutor.awaitTermination(ThreadPoolExecutor.java:1468)}}
> {{        at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:98)}}
> {{        at 
> kafka.server.KafkaServer$$anonfun$shutdown$5.apply$mcV$sp(KafkaServer.scala:569)}}
> {{        at kafka.utils.CoreUtils$.swallow(CoreUtils.scala:85)}}
> {{        at kafka.server.KafkaServer.shutdown(KafkaServer.scala:569)}}
> {{        at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.stop(KafkaEmbedded.java:129)}}
> {{        at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.stop(EmbeddedKafkaCluster.java:126)}}
> {{        at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.after(EmbeddedKafkaCluster.java:158)}}
> {{        at 
> org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:50)}}
> {{        at org.junit.rules.RunRules.evaluate(RunRules.java:20)}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to