Jakub created KAFKA-8882:
----------------------------
Summary: It's not possible to restart Kafka Streams using
StateListener
Key: KAFKA-8882
URL: https://issues.apache.org/jira/browse/KAFKA-8882
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 2.2.1
Environment: Linux, Windows
Reporter: Jakub
Upon problems with connecting to a Kafka Cluster services using Kafka Streams
stop working with the following error message:
{code:java}
Encountered the following unexpected Kafka exception during processing, this
usually indicate Streams internal errors (...)
Caused by: org.apache.kafka.common.errors.TimeoutException: Aborted due to
timeout
(...)
State transition from PENDING_SHUTDOWN to DEAD
(...)
All stream threads have died. The instance will be in error state and should be
closed.
{code}
We tried to use a StateListener to automatically detect and work around this
problem.
However, this approach doesn't seem to work correctly:
# KafkaStreams correctly transitions from status Error to Pending Shutdown,
but then it stays in this status forever.
# Occasionally, after registering a listener the status doesn't even change to
Error.
{code:java}
kafkaStreams.setStateListener(new StateListener() {
public void onChange(State stateNew, State stateOld) {
if (stateNew == State.ERROR) {
kafkaStreams.cleanUp();
kafkaStreams.close();
} else if (stateNew == State.PENDING_SHUTDOWN) {
// this message is displayed, and then nothig else
happens
LOGGER.info("State is PENDING_SHUTDOWN");
} else if (stateNew == State.NOT_RUNNING) {
// it never gets here
kafkaStreams = createKafkaStreams();
kafkaStreams.start();
}
}
});
{code}
Surprisingly, restarting KafkaStreams outside of a listener works fine.
I'm happy to provide more details if required.
--
This message was sent by Atlassian Jira
(v8.3.2#803003)