GitHub user original-brownbear opened a pull request:

    https://github.com/apache/kafka/pull/2791

    MINOR: Fix Deadlock in StreamThread

    I think this may be the (or on of them) reason we see Jenkins jobs time out 
at times. At least I can reproduce this to cause tests to time out with a 
certain rate.
    
    With current trunk there is a possibility to run into this:
    
    ```sh
    "kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 
tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
       java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
        - waiting to lock <0x000000077d33c538> (a 
org.apache.kafka.streams.processor.internals.StreamThread)
        at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
        at java.lang.Thread.run(Thread.java:745)
    
    "appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 
os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry 
[0x00007f66ae4e6000]
       java.lang.Thread.State: BLOCKED (on object monitor)
        at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
        - waiting to lock <0x000000077d335760> (a 
org.apache.kafka.streams.KafkaStreams)
        at 
org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
        at 
org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
        - locked <0x000000077d42f138> (a 
org.apache.kafka.streams.KafkaStreams$StreamStateListener)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
        - locked <0x000000077d33c538> (a 
org.apache.kafka.streams.processor.internals.StreamThread)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
        - locked <0x000000077d33c538> (a 
org.apache.kafka.streams.processor.internals.StreamThread)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
        at 
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
    
    ```
    
    In a nutshell: `KafkaStreams` and `StreamThread` are both waiting for each 
other since another intermittend `close` (eg. from a test) comes along  also 
trying to lock on `KafkaStreams` :
    
    ```sh
    "main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in 
Object.wait() [0x00007f66d7a15000]
       java.lang.Thread.State: WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1249)
        - locked <0x000000077d45a590> (a java.lang.Thread)
        at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
        - locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
        at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
        at 
org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
        at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
        at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
        at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
        at 
org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
        at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
        at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
        at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
        at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
        at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
        at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
        at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
        at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
        at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
        at org.junit.rules.RunRules.evaluate(RunRules.java:20)
        at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
        at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
        at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
        at 
com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:71)
        at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
        at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
    ```
    
    => causing a deadlock.
    
    Fixed this by softer locking on the state change, that guarantees atomic 
changes to the state but does not lock on the whole object (I at least could 
not find another method that would require more than atomicly-locked access 
except for `setState`).
    Also qualified the state listeners with their outer-class to make the whole 
code-flow around this more readable (having two interfaces with the same naming 
for interface and method and then using them between their two outer classes is 
crazy hard to get imo :)).

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/original-brownbear/kafka fix-streams-deadlock

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/2791.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2791
    
----
commit 8374c13c94cd50ca1c3cbb3c123b0b841d5ff7bc
Author: Armin Braun <m...@obrown.io>
Date:   2017-04-02T10:29:26Z

    MINOR: Fix Deadlock in StreamThread

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to