[ https://issues.apache.org/jira/browse/KAFKA-8062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16786882#comment-16786882 ]
Bill Bejeck commented on KAFKA-8062: ------------------------------------ Hi [~andrey.v.volkov] Thanks for reporting this, can you share your topology to re-create the error locally? Thanks, Bill > StateListener is not notified when StreamThread dies > ---------------------------------------------------- > > Key: KAFKA-8062 > URL: https://issues.apache.org/jira/browse/KAFKA-8062 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.1.1 > Environment: Kafka 2.1.1, kafka-streams-scala version 2.1.1 > Reporter: Andrey Volkov > Priority: Minor > > I want my application to react when streams die. Trying to use > KafkaStreams.setStateListener. Also checking KafkaStreams.state() from time > to time. > The test scenario: Kafka is available, but there are no topics that my > Topology is supposed to use. > I expect streams to crash and the state listener to be notified about that, > with the new state ERROR. KafkaStreams.state() should also return ERROR. > In reality the streams crash, but the KafkaStreams.state() method always > returns REBALANCING and the last time the StateListener was called, the new > state was also REBALANCING. > > I believe the reason for this is in the methods: > org.apache.kafka.streams.KafkaStreams.StreamStateListener.onChange() which > does not react on the state StreamsThread.State.PENDING_SHUTDOWN > and > org.apache.kafka.streams.processor.internals.StreamThread.RebalanceListener.onPartitionsAssigned, > which calls shutdown() setting the state to PENDING_SHUTDOWN and then > streamThread.setStateListener(null) effectively removing the state listener, > so that the DEAD state of the thread never reaches KafkaStreams object. > Here is an extract from the logs: > {{14:57:03.272 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > ERROR o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread > [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer] > test-input-topic is unknown yet during rebalance, please make sure they have > been pre-created before starting the Streams application.}} > {{14:57:03.283 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer > clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, > groupId=Test] Successfully joined group with generation 1}} > {{14:57:03.284 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer > clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, > groupId=Test] Setting newly assigned partitions []}} > {{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.s.p.i.StreamThread - stream-thread > [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Informed to shut > down}} > {{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.s.p.i.StreamThread - stream-thread > [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition > from PARTITIONS_REVOKED to PENDING_SHUTDOWN}} > {{14:57:03.316 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.s.p.i.StreamThread - stream-thread > [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutting down}} > {{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.c.c.KafkaConsumer - [Consumer > clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-restore-consumer, > groupId=] Unsubscribed all topics or patterns and assigned partitions}} > {{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.c.p.KafkaProducer - [Producer > clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-producer] > Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.}} > {{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.s.p.i.StreamThread - stream-thread > [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition > from PENDING_SHUTDOWN to DEAD}} > {{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] > INFO o.a.k.s.p.i.StreamThread - stream-thread > [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutdown complete}} > After this calls to KafkaStreams.state() still return REBALANCING > There is a workaround with requesting KafkaStreams.localThreadsMetadata() and > checking each thread's state manually, but that seems very wrong. -- This message was sent by Atlassian JIRA (v7.6.3#76005)