[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2021-01-28 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17274187#comment-17274187
 ] 

Guozhang Wang commented on KAFKA-10793:
---

cc [~hachikuji] [~ijuma] Hopefully we nailed it this time! :)

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: https://issues.apache.org/jira/browse/KAFKA-10793
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Assignee: A. Sophie Blee-Goldman
>Priority: Critical
> Fix For: 2.8.0, 2.7.1
>
>
> Pretty much as soon as we started actively monitoring the 
> _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we 
> started seeing something weird. Every so often one of the StreamThreads (ie a 
> single Consumer instance) would appear to permanently fall out of the group, 
> as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We 
> inject artificial network failures every few hours at most, so the group 
> rebalances quite often. But the one consumer never rejoins, with no other 
> symptoms (besides a slight drop in throughput since the remaining threads had 
> to take over this member's work). We're confident that the problem exists in 
> the client layer, since the logs confirmed that the unhealthy consumer was 
> still calling poll. It was also calling Consumer#committed in its main poll 
> loop, which was consistently failing with a TimeoutException.
> When I attached a remote debugger to an instance experiencing this issue, the 
> network client's connection to the group coordinator (the one that uses 
> MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But 
> for some reason it never tried to re-establish this connection, although it 
> did successfully connect to that same broker through the "normal" connection 
> (ie the one that juts uses node.id).
> The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed 
> (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null 
> so a new request is never sent. This shouldn't be possible since the 
> FindCoordinatorResponseHandler is supposed to clear the 
> _findCoordinatorFuture_ when the future is completed. But somehow that didn't 
> happen, so the consumer continues to assume there's still a FindCoordinator 
> request in flight and never even notices that it's dropped out of the group.
> These are the only confirmed findings so far, however we have some guesses 
> which I'll leave in the comments. Note that we only noticed this due to the 
> newly added _last-rebalance-seconds-ago_ __metric, and there's no reason to 
> believe this bug hasn't been flying under the radar since the Consumer's 
> inception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241987#comment-17241987
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10793:


So that's our best guess. We should discuss the solution on the PR, but I'll 
lay out the two possibilities I see here in case anyone has any better ideas.

1) synchronize joinGroupIfNeeded()
2) clear the _findCoordinatorFuture_ when handling the result, rather than in 
the listener callbacks. For example in the main loop of ensureCoordinatorReady()

Personally I think option 2 is better, since it makes a lot more sense to me to 
begin with. By clearing the future in the listener callbacks, we might clear it 
before we ever even get to check on the result, eg the exception if failed. We 
actually seem to already anticipate this particular problem and recently 
implemented a workaround by adding an extra listener which saves the exception 
to a class _findCoordinatorException_ field. If we just wait to clear the 
future then presumably we could remove this workaround as well, and just save 
the exception when we check "if (future.failed())" inside of 
lookupCoordinator().

All that said, I'm not intimately familiar with the technical details of the 
ConsumerNetworkClient and how it handles its RequestFutures, so it's possible 
I'm missing something important.

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: https://issues.apache.org/jira/browse/KAFKA-10793
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>
> Pretty much as soon as we started actively monitoring the 
> _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we 
> started seeing something weird. Every so often one of the StreamThreads (ie a 
> single Consumer instance) would appear to permanently fall out of the group, 
> as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We 
> inject artificial network failures every few hours at most, so the group 
> rebalances quite often. But the one consumer never rejoins, with no other 
> symptoms (besides a slight drop in throughput since the remaining threads had 
> to take over this member's work). We're confident that the problem exists in 
> the client layer, since the logs confirmed that the unhealthy consumer was 
> still calling poll. It was also calling Consumer#committed in its main poll 
> loop, which was consistently failing with a TimeoutException.
> When I attached a remote debugger to an instance experiencing this issue, the 
> network client's connection to the group coordinator (the one that uses 
> MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But 
> for some reason it never tried to re-establish this connection, although it 
> did successfully connect to that same broker through the "normal" connection 
> (ie the one that juts uses node.id).
> The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed 
> (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null 
> so a new request is never sent. This shouldn't be possible since the 
> FindCoordinatorResponseHandler is supposed to clear the 
> _findCoordinatorFuture_ when the future is completed. But somehow that didn't 
> happen, so the consumer continues to assume there's still a FindCoordinator 
> request in flight and never even notices that it's dropped out of the group.
> These are the only confirmed findings so far, however we have some guesses 
> which I'll leave in the comments. Note that we only noticed this due to the 
> newly added _last-rebalance-seconds-ago_ __metric, and there's no reason to 
> believe this bug hasn't been flying under the radar since the Consumer's 
> inception



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10793) Race condition in FindCoordinatorFuture permanently severs connection to group coordinator

2020-12-01 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10793?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17241976#comment-17241976
 ] 

A. Sophie Blee-Goldman commented on KAFKA-10793:


At this point we can only guess, but all signs point to a race condition 
between the main consumer thread and the heartbeat thread. One possibility is 
that when the future failed it just didn't trigger the `onFailure` callback, 
but [~guozhang] & I have both looked through the source code and don't see any 
way for this to occur. Another possibility is that the `onFailure` callback was 
triggered, but it was invoked too soon_. If the future was completed before we 
ever assigned it to the findCoordinatorFuture field, then we would never 
actually clear the latest future (we would just set an already-null field to 
null again)._

Is this possible? Here's how the AbstractCoordinator builds the request and 
assigns the future:
{code:java}
protected synchronized RequestFuture lookupCoordinator() {
...
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
{code}
 
{code:java}
private RequestFuture sendFindCoordinatorRequest(Node node) {
...
return client.send(node, requestBuilder).compose(new 
FindCoordinatorResponseHandler());
}{code}
Inside #compose we call #addListener, which contains this snippet:

 
{code:java}
if (failed()) 
fireFailure(); 
{code}
If the request has already failed by the time we reach this, then we'll trigger 
the `onFailure` callback before #compose ever returns – ie before we've 
assigned the future to _findCoordinatorFuture_.

The obvious question now is whether it's possible for the request to be failed 
in another thread while one thread is in the middle of the synchronized 
lookupCoordinator(). The request can be failed by the ConsumerNetworkClient 
when polled, during checkDisconnects(). The heartbeat thread actually 
synchronizes the entire run loop, so it doesn't seem possible for the hb thread 
to fail this request in the background of the main thread during a 
lookupCoordinator().

But the inverse is not true: it's possible for the main consumer thread to fail 
the request while the hb thread is inside of lookupCoordinator(). The 
AbstractCoordinator will poll the network client inside of joinGroupIfNeeded(), 
which in not itself synchronized and may be invoked without any locking through 
a Consumer#poll.

 

 

 

> Race condition in FindCoordinatorFuture permanently severs connection to 
> group coordinator
> --
>
> Key: KAFKA-10793
> URL: https://issues.apache.org/jira/browse/KAFKA-10793
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer, streams
>Affects Versions: 2.5.0
>Reporter: A. Sophie Blee-Goldman
>Priority: Critical
>
> Pretty much as soon as we started actively monitoring the 
> _last-rebalance-seconds-ago_ metric in our Kafka Streams test environment, we 
> started seeing something weird. Every so often one of the StreamThreads (ie a 
> single Consumer instance) would appear to permanently fall out of the group, 
> as evidenced by a monotonically increasing _last-rebalance-seconds-ago._ We 
> inject artificial network failures every few hours at most, so the group 
> rebalances quite often. But the one consumer never rejoins, with no other 
> symptoms (besides a slight drop in throughput since the remaining threads had 
> to take over this member's work). We're confident that the problem exists in 
> the client layer, since the logs confirmed that the unhealthy consumer was 
> still calling poll. It was also calling Consumer#committed in its main poll 
> loop, which was consistently failing with a TimeoutException.
> When I attached a remote debugger to an instance experiencing this issue, the 
> network client's connection to the group coordinator (the one that uses 
> MAX_VALUE - node.id as the coordinator id) was in the DISCONNECTED state. But 
> for some reason it never tried to re-establish this connection, although it 
> did successfully connect to that same broker through the "normal" connection 
> (ie the one that juts uses node.id).
> The tl;dr is that the AbstractCoordinator's FindCoordinatorRequest has failed 
> (presumably due to a disconnect), but the _findCoordinatorFuture_ is non-null 
> so a new request is never sent. This shouldn't be possible since the 
> FindCoordinatorResponseHandler is supposed to clear the 
> _findCoordinatorFuture_ when the future is completed. But somehow that didn't 
> happen, so the consumer continues to assume there's still a FindCoordinator 
> request in flight and never even notices that it's dropped out of the group.
> These are the only confirmed findings so far, however we have some guesses 
> which I'll leave in the comments. Note that we only noticed