[ 
https://issues.apache.org/jira/browse/KAFKA-7610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689866#comment-16689866
 ] 

Jason Gustafson commented on KAFKA-7610:
----------------------------------------

For some background, the case we observed in practice was caused by a consumer 
that was slow to rejoin the group after a rebalance had begun. At the same 
time, there were new members that were trying to join the group for the first 
time. The request timeout was significantly lower than the rebalance timeout, 
so the JoinGroup of the new members kept timing out. The timeout caused a retry 
and the group size eventually become quite large because we could not detect 
the fact that the new members were no longer there. Although I think it may be 
useful 

As Stanislav mentions, the main advantage of detecting disconnected consumers 
is that it would work for older clients. Probably we should just do this if the 
complexity is not too high. It is not a complete solution though because 
detection of a connection which did not close cleanly will still take a 
significant amount of time. I think we should consider the protocol improvement 
in addition to this so that we have direct control over failure detection going 
forward.

Furthermore, I think our approach of holding JoinGroup requests in purgatory 
for long amounts of times has been problematic. It forces users to tradeoff the 
time to detect failed connections using `request.timeout.ms` with the time 
needed to complete a rebalance in the worst case. We kind of hacked around this 
in the consumer by allowing the JoinGroup to override the `request.timeout.ms` 
provided by the user. So users can provide a reasonable timeout value and 
detect failures in a reasonable time for every other API, but JoinGroup is 
still an issue. 

Really I'm not sure it's ever a good idea to have a request sitting on the 
broker for minutes. For example, this has bad interplay with the idle 
connection timeout. The broker will proactively close connections that aren't 
seeing any activity even if we've still got a JoinGroup request sitting in 
purgatory. It would be better to move to a model which incorporated client 
polling so that request times could be reasonably controlled. Of course we also 
don't want to the JoinGroup to introduce a lot of new request traffic, so 
holding the request for shorter durations may be reasonable. In any case, if we 
solve this problem, then we probably also solve the issue here.

As for `group.max.size`, I agree with Stanislav and Boyang that it could be 
helpful, but I think that is a separate discussion. Even if we had this, we 
would still want a way to detect failures for the initial join. Perhaps one of 
you can open a JIRA?

TLDR;
1. We probably should do the disconnect option as an initial step since it 
addresses the problem for all client version.
2. We should also consider protocol improvements to avoid holding the JoinGroup 
requests in purgatory for indefinite amounts of time.

> Detect consumer failures in initial JoinGroup
> ---------------------------------------------
>
>                 Key: KAFKA-7610
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7610
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Jason Gustafson
>            Priority: Major
>
> The session timeout and heartbeating logic in the consumer allow us to detect 
> failures after a consumer joins the group. However, we have no mechanism to 
> detect failures during a consumer's initial JoinGroup when its memberId is 
> empty. When a client fails (e.g. due to a disconnect), the newly created 
> MemberMetadata will be left in the group metadata cache. Typically when this 
> happens, the client simply retries the JoinGroup. Every retry results in a 
> new dangling member created and left in the group. These members are doomed 
> to a session timeout when the group finally finishes the rebalance, but 
> before that time, they are occupying memory. In extreme cases, when a 
> rebalance is delayed (possibly due to a buggy application), this cycle can 
> repeat and the cache can grow quite large.
> There are a couple options that come to mind to fix the problem:
> 1. During the initial JoinGroup, we can detect failed members when the TCP 
> connection fails. This is difficult at the moment because we do not have a 
> mechanism to propagate disconnects from the network layer. A potential option 
> is to treat the disconnect as just another type of request and pass it to the 
> handlers through the request queue.
> 2. Rather than holding the JoinGroup in purgatory for an indefinite amount of 
> time, we can return earlier with the generated memberId and an error code 
> (say REBALANCE_IN_PROGRESS) to indicate that retry is needed to complete the 
> rebalance. The consumer can then poll for the rebalance using its assigned 
> memberId. And we can detect failures through the session timeout. Obviously 
> this option requires a KIP (and some more thought).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to