[ 
https://issues.apache.org/jira/browse/KAFKA-8062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16795459#comment-16795459
 ] 

ASF GitHub Bot commented on KAFKA-8062:
---------------------------------------

guozhangwang commented on pull request #6468: KAFKA-8062: Do not remore 
StateListener when shutting down stream thread
URL: https://github.com/apache/kafka/pull/6468
 
 
   In a previous commit https://github.com/apache/kafka/pull/6091, we've fixed 
a couple of edge cases and hence do not need to remove state listener anymore 
(before that we removed the state listener intentionally to avoid some race 
conditions, which has been gone for now).
   
   Not removing the state listener would automatically fix the issue that when 
threads are shutting down due to error code, the instance-level state will not 
be transited to ERROR, and then eventually to NOT_RUNNING.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StateListener is not notified when StreamThread dies
> ----------------------------------------------------
>
>                 Key: KAFKA-8062
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8062
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.1.1
>         Environment: Kafka 2.1.1, kafka-streams-scala version 2.1.1
>            Reporter: Andrey Volkov
>            Assignee: Bill Bejeck
>            Priority: Minor
>
> I want my application to react when streams die. Trying to use 
> KafkaStreams.setStateListener. Also checking KafkaStreams.state() from time 
> to time.
> The test scenario: Kafka is available, but there are no topics that my 
> Topology is supposed to use.
> I expect streams to crash and the state listener to be notified about that, 
> with the new state ERROR. KafkaStreams.state() should also return ERROR.
> In reality the streams crash, but the KafkaStreams.state() method always 
> returns REBALANCING and the last time the StateListener was called, the new 
> state was also REBALANCING. 
>  
> I believe the reason for this is in the methods:
> org.apache.kafka.streams.KafkaStreams.StreamStateListener.onChange() which 
> does not react on the state StreamsThread.State.PENDING_SHUTDOWN
> and
> org.apache.kafka.streams.processor.internals.StreamThread.RebalanceListener.onPartitionsAssigned,
>  which calls shutdown() setting the state to PENDING_SHUTDOWN and then
> streamThread.setStateListener(null) effectively removing the state listener, 
> so that the DEAD state of the thread never reaches KafkaStreams object.
> Here is an extract from the logs:
> {{14:57:03.272 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> ERROR o.a.k.s.p.i.StreamsPartitionAssignor - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer] 
> test-input-topic is unknown yet during rebalance, please make sure they have 
> been pre-created before starting the Streams application.}}
> {{14:57:03.283 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.c.c.i.AbstractCoordinator - [Consumer 
> clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, 
> groupId=Test] Successfully joined group with generation 1}}
> {{14:57:03.284 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.c.c.i.ConsumerCoordinator - [Consumer 
> clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-consumer, 
> groupId=Test] Setting newly assigned partitions []}}
> {{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Informed to shut 
> down}}
> {{14:57:03.285 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition 
> from PARTITIONS_REVOKED to PENDING_SHUTDOWN}}
> {{14:57:03.316 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutting down}}
> {{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-restore-consumer,
>  groupId=] Unsubscribed all topics or patterns and assigned partitions}}
> {{14:57:03.317 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.c.p.KafkaProducer - [Producer 
> clientId=Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.}}
> {{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] State transition 
> from PENDING_SHUTDOWN to DEAD}}
> {{14:57:03.332 [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] 
> INFO o.a.k.s.p.i.StreamThread - stream-thread 
> [Test-749d56c7-505d-46a4-8dbd-2c756b353adc-StreamThread-1] Shutdown complete}}
> After this calls to KafkaStreams.state() still return REBALANCING
> There is a workaround with requesting KafkaStreams.localThreadsMetadata() and 
> checking each thread's state manually, but that seems very wrong.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to