[ 
https://issues.apache.org/jira/browse/KAFKA-7657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16711839#comment-16711839
 ] 

Guozhang Wang commented on KAFKA-7657:
--------------------------------------

[~pkleindl] I think I've spotted the root cause of your issue. Here s a brief 
summary:

1) During the time of {{2018-11-30 09:08:14}} there is a broker-side leader 
migration, which caused Streams' embedded producers to get 
{{NotLeaderForPartitionException}}. This error is not a fatal error but 
retriable. The producer will retry based on the configured number of retries 
and backoffs. In 2.0.1 that you and  [~tscrowley] is running, the default value 
is 10 and 100ms (in trunk and in the up coming 2.1.0 version we've changed the 
default to int.MAX as part of KIP-91), which means that if you did not change 
that value, then if the broker side's metadata did not get refreshed in 10 * 
100ms ~ 1 second the num.retries will be exhausted and it will be treated as 
fatal and cause the producer to throw.

2) When producer throws (which is the case from your logs), the corresponding 
stream thread (15 in your case) will be shutdown gracefully, and its tasks are 
migrated to other threads. And I think after that the broker side resumes 
normal so the other threads can proceed, i.e from then on the streams app 
instance is running with one thread less. But from outside this is still normal.

3) The bug correlated to this scenario is that, when 2) happens we did not 
remove the corresponding thread-state from the map, and hence because this 
dangling state is never coming back to RUNNING, the application-instance-level 
state would never be transited. I can provide a PR for this issue.

4) Atm, if you want to be notified if something similar happens which causes 
you to lose some threads within the application instance, you can either a) 
watch one the thread-level metrics, and alert when they've gone black, or b) 
watch on the thread-level state as well, which can be read from 
KafkaStreams#localThreadMetadata.

> Invalid reporting of stream state in Kafka streams application
> --------------------------------------------------------------
>
>                 Key: KAFKA-7657
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7657
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.0.1
>            Reporter: Thomas Crowley
>            Priority: Major
>              Labels: bug
>
> We have a streams application with 3 instances running, two of which are 
> reporting the state of REBALANCING even after they have been running for 
> days. Restarting the application has no effect on the stream state.
> This seems suspect because each instance appears to be processing messages, 
> and the kafka-consumer-groups CLI tool reports hardly any offset lag in any 
> of the partitions assigned to the REBALANCING consumers. Each partition seems 
> to be processing an equal amount of records too.
> Inspecting the state.dir on disk, it looks like the RocksDB state has been 
> built and hovers at the expected size on disk.
> This problem has persisted for us after we rebuilt our Kafka cluster and 
> reset topics + consumer groups in our dev environment.
> There is nothing in the logs (with level set to DEBUG) in both the broker or 
> the application that suggests something exceptional has happened causing the 
> application to be stuck REBALANCING.
> We are also running multiple streaming applications where this problem does 
> not exist.
> Two differences between this application and our other streaming applications 
> are:
>  * We have processing.guarantee set to exactly_once
>  * We are using a ValueTransformer which fetches from and puts data on a 
> windowed state store
> The REBALANCING state is returned from both polling the state method of our 
> KafkaStreams instance, and our custom metric which is derived from some logic 
> in a KafkaStreams.StateListener class attached via the setStateListener 
> method.
>  
> While I have provided a bit of context, before I reply with some reproducible 
> code - is there a simple way in which I can determine that my streams 
> application is in a RUNNING state without relying on the same mechanisms as 
> used above?
> Further, given that it seems like my application is actually running - could 
> this perhaps be a bug to do with how the stream state is being reported (in 
> the context of a transactional stream using the processor API)?
>  
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to