Repository: kafka
Updated Branches:
refs/heads/trunk bdf4cba04 -> 3364f12bc
MINOR: Fix deadlock between StreamThread and KafkaStreams
This may be a reason why we see Jenkins jobs time out at times.
I can reproduce it locally.
With current trunk there is a possibility to run into this:
```sh
"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0
tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000]
java.lang.Thread.State: BLOCKED (on object monitor)
at
org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
- waiting to lock <0x000000077d33c538> (a
org.apache.kafka.streams.processor.internals.StreamThread)
at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
at java.lang.Thread.run(Thread.java:745)
"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5
os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry
[0x00007f66ae4e6000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
- waiting to lock <0x000000077d335760> (a
org.apache.kafka.streams.KafkaStreams)
at
org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
at
org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
- locked <0x000000077d42f138> (a
org.apache.kafka.streams.KafkaStreams$StreamStateListener)
at
org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
- locked <0x000000077d33c538> (a
org.apache.kafka.streams.processor.internals.StreamThread)
at
org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
- locked <0x000000077d33c538> (a
org.apache.kafka.streams.processor.internals.StreamThread)
at
org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)
```
In a nutshell: `KafkaStreams` and `StreamThread` are both
waiting for each other since another intermittent `close`
(eg. from a test) comes along also trying to lock on
`KafkaStreams` :
```sh
"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait()
[0x00007f66d7a15000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
- locked <0x000000077d45a590> (a java.lang.Thread)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
at
org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
```
=> causing a deadlock.
Fixed this by softer locking on the state change, that guarantees
atomic changes to the state but does not lock on the whole object
(I at least could not find another method that would require more
than atomicly-locked access except for `setState`).
Also qualified the state listeners with their outer-class to make
the whole code-flow around this more readable (having two
interfaces with the same naming for interface and method and then
using them between their two outer classes is crazy hard to read
imo :)).
Easy to reproduced yourself by running
`org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit
(save yourself some time by running 2-4 in parallel :)). Eventually
it will lock on one of the tests (for me this takes less than 1 min
with 4 parallel runs).
Author: Armin Braun <[email protected]>
Author: Armin <[email protected]>
Reviewers: Eno Thereska <[email protected]>, Damian Guy <[email protected]>,
Ismael Juma <[email protected]>
Closes #2791 from original-brownbear/fix-streams-deadlock
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3364f12b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3364f12b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3364f12b
Branch: refs/heads/trunk
Commit: 3364f12bc240e3fefa6a467519ef608fa768917c
Parents: bdf4cba
Author: Armin Braun <[email protected]>
Authored: Mon Apr 3 12:50:51 2017 +0100
Committer: Ismael Juma <[email protected]>
Committed: Mon Apr 3 12:50:57 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/producer/KafkaProducer.java | 2 +-
.../org/apache/kafka/streams/KafkaStreams.java | 36 +++++++++++---------
.../processor/internals/StreamThread.java | 8 ++---
.../apache/kafka/streams/KafkaStreamsTest.java | 2 +-
4 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 9342791..8fe99dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -769,7 +769,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
log.info("Closing the Kafka producer with timeoutMillis = {} ms.",
timeUnit.toMillis(timeout));
// this will keep track of the first encountered exception
- AtomicReference<Throwable> firstException = new
AtomicReference<Throwable>();
+ AtomicReference<Throwable> firstException = new AtomicReference<>();
boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
if (timeout > 0) {
if (invokedFromCallback) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2c116d9..6ddf2a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -189,8 +189,12 @@ public class KafkaStreams {
return validTransitions.contains(newState.ordinal());
}
}
+
+ private final Object stateLock = new Object();
+
private volatile State state = State.CREATED;
- private StateListener stateListener = null;
+
+ private KafkaStreams.StateListener stateListener = null;
/**
@@ -208,25 +212,25 @@ public class KafkaStreams {
}
/**
- * An app can set a single {@link StateListener} so that the app is
notified when state changes.
+ * An app can set a single {@link KafkaStreams.StateListener} so that the
app is notified when state changes.
* @param listener a new state listener
*/
- public void setStateListener(final StateListener listener) {
+ public void setStateListener(final KafkaStreams.StateListener listener) {
stateListener = listener;
}
- private synchronized void setState(final State newState) {
- final State oldState = state;
- if (!state.isValidTransition(newState)) {
- log.warn("{} Unexpected state transition from {} to {}.",
logPrefix, oldState, newState);
- } else {
- log.info("{} State transition from {} to {}.", logPrefix,
oldState, newState);
- }
-
- state = newState;
-
- if (stateListener != null) {
- stateListener.onChange(state, oldState);
+ private void setState(final State newState) {
+ synchronized (stateLock) {
+ final State oldState = state;
+ if (!state.isValidTransition(newState)) {
+ log.warn("{} Unexpected state transition from {} to {}.",
logPrefix, oldState, newState);
+ } else {
+ log.info("{} State transition from {} to {}.", logPrefix,
oldState, newState);
+ }
+ state = newState;
+ if (stateListener != null) {
+ stateListener.onChange(state, oldState);
+ }
}
}
@@ -248,7 +252,7 @@ public class KafkaStreams {
return Collections.unmodifiableMap(metrics.metrics());
}
- private class StreamStateListener implements StreamThread.StateListener {
+ private final class StreamStateListener implements
StreamThread.StateListener {
@Override
public synchronized void onChange(final StreamThread thread,
final StreamThread.State newState,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 46704b9..9791a0a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -124,7 +124,7 @@ public class StreamThread extends Thread {
}
private volatile State state = State.NOT_RUNNING;
- private StateListener stateListener = null;
+ private StreamThread.StateListener stateListener = null;
/**
* Listen to state change events
@@ -141,10 +141,10 @@ public class StreamThread extends Thread {
}
/**
- * Set the {@link StateListener} to be notified when state changes. Note
this API is internal to
+ * Set the {@link StreamThread.StateListener} to be notified when state
changes. Note this API is internal to
* Kafka Streams and is not intended to be used by an external application.
*/
- public void setStateListener(final StateListener listener) {
+ public void setStateListener(final StreamThread.StateListener listener) {
this.stateListener = listener;
}
@@ -463,7 +463,7 @@ public class StreamThread extends Thread {
action.apply(task);
} catch (RuntimeException t) {
log.error("{} Failed while executing {} {} due to {}: ",
- StreamThread.this.logPrefix,
+ logPrefix,
task.getClass().getSimpleName(),
task.id(),
exceptionMessage,
http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index eebbde9..efa484e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -282,7 +282,7 @@ public class KafkaStreamsTest {
try {
streams.cleanUp();
} catch (final IllegalStateException e) {
- Assert.assertEquals("Cannot clean up while running.",
e.getMessage());
+ assertEquals("Cannot clean up while running.", e.getMessage());
throw e;
} finally {
streams.close();