[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17112676#comment-17112676
 ] 

Matthias J. Sax commented on KAFKA-6520:
----------------------------------------

The consumer only stores/caches the last known coordinator metadata. There is 
not coordinator liveness check. The consumer only talks to the coordinator 
during a rebalance. However, if a network issue occurs during regular 
processing and the consumer cannot fetch data from the brokers, the cached 
coordinator metadata stays the same (ie, the coordinator is still known). Does 
this make sense?

In the end, I believe without a consumer change we cannot really resolve the 
issue though. Maybe we could introduce a new "disconnected timeout" to the 
consumer: internally, the consumer sends fetch requests on a regular basis. 
Those fetch request would timeout if there is a network issue. Currently, the 
consumer "swallows" those timeout exceptions and just keeps retrying. `poll()` 
would just return zero records, but never rethrow a TimeoutException. The new 
"disconnected timeout" could be used to set a limit how long fetch request 
should be retried: if all fetch requests timeout for a period longer than 
"disconnected timeout", poll() could throw a "DisconnectedException" in the 
next `poll()` call (to reuse TimeoutException could be miss leading?).

By default, the new "disconnected timeout" would be set to MAX_VALUE, and thus 
the default behavior would not change. Within KafkaStreams, we can set this new 
timeout to a smaller value and catch the exception (and change the Kafka 
Streams state to DISCONNECT). On a consecutive `poll()` that does not throw, we 
set the state back to RUNNING.

Thoughts?

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> --------------------------------------------------------------------------------
>
>                 Key: KAFKA-6520
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6520
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>            Reporter: Michael Kohout
>            Priority: Major
>              Labels: newbie, user-experience
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-457%3A+Add+DISCONNECTED+status+to+Kafka+Streams]
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
>  See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  This is a link to a related 
> issue.
> -------------------------
> Update: there are some discussions on the PR itself which leads me to think 
> that a more general solution should be at the ClusterConnectionStates rather 
> than at the Streams or even Consumer level. One proposal would be:
>  * Add a new metric named `failedConnection` in SelectorMetrics which is 
> recorded at `connect()` and `pollSelectionKeys()` functions, upon capture the 
> IOException / RuntimeException which indicates the connection disconnected.
>  * And then users of Consumer / Streams can monitor on this metric, which 
> normally will only have close to zero values as we have transient 
> disconnects, if it is spiking it means the brokers are consistently being 
> unavailable indicting the state.
> [~Yohan123] WDYT?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to