Steven Schlansker created KAFKA-4787:
----------------------------------------
Summary: KafkaStreams close() is not reentrant
Key: KAFKA-4787
URL: https://issues.apache.org/jira/browse/KAFKA-4787
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 0.10.2.0
Reporter: Steven Schlansker
While building a simple application, I tried to implement a failure policy
where any uncaught exception terminates the application until an administrator
can evaluate and intervene:
{code}
/** Handle any uncaught exception by shutting down the program. */
private void handleStreamException(Thread thread, Throwable t) {
LOG.error("stream exception in thread {}", thread, t);
streams.close();
}
streams.setUncaughtExceptionHandler(this::handleStreamException);
streams.start();
{code}
Unfortunately, because the KafkaStreams#close() method takes a lock, this is
prone to what looks like a deadlock:
{code}
"StreamThread-1" #80 prio=5 os_prio=0 tid=0x00007f56096f4000 nid=0x40c8 waiting
for monitor entry [0x00007f54f03ee000]
java.lang.Thread.State: BLOCKED (on object monitor)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java)
- waiting to lock <0x00000000f171cda8> (a
org.apache.kafka.streams.KafkaStreams)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
at
com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown
Source)
at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
at
com.opentable.chat.service.ChatStorage.handleStreamException(ChatStorage.java:541)
at
com.opentable.chat.service.ChatStorage$$Lambda$123/149062221.uncaughtException(Unknown
Source)
at java.lang.Thread.dispatchUncaughtException(Thread.java:1956)
"main" #1 prio=5 os_prio=0 tid=0x00007f5608011000 nid=0x3f76 in Object.wait()
[0x00007f5610f04000]
java.lang.Thread.State: WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1249)
- locked <0x00000000fd302bf0> (a java.lang.Thread)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:494)
- locked <0x00000000f171cda8> (a org.apache.kafka.streams.KafkaStreams)
at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:438)
at
com.opentable.chat.service.ChatStorage$$Lambda$161/1940967023.close(Unknown
Source)
at com.opentable.chat.service.ChatStorage.closeLog(ChatStorage.java:212)
at com.opentable.chat.service.ChatStorage.close(ChatStorage.java:207)
{code}
Note how the main thread calls close(), which encounters an exception. It uses
a StreamThread to dispatch to the handler, which calls close(). Once it tries
to take the monitor, we are left in a position where main is joined on
StreamThread-1, but StreamThread-1 is waiting for main to release that monitor.
Arguably it's a bit abusive to call close() in this way (it certainly wasn't
intentional) -- but to make Kafka Streams robust it should handle any sequence
of close() invocations in particular gracefully.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)