Re: Task manager not able to rejoin job manager after network hicup

2018-02-26 Thread Till Rohrmann
Actually, I remembered why we didn't enable it by default. The problem with
this feature is the following: In case of a JM failover it could happen
that all TMs think they got quarantined because the JM ActorSystem is no
longer reachable. Therefore, you could see a lot of TM restarts in this
case where it is not really necessary. There is actually a PR which removes
the Akka based heartbeats on the TM side [1] which would mitigate the
problem. But this PR never got merged.

Given that the user can activate this feature and that it should no longer
be present with Flink 1.5, I think we should keep it as it is. What do you
think?

[1] https://github.com/apache/flink/pull/2742

Cheers,
Till

On Mon, Feb 26, 2018 at 4:33 PM, Till Rohrmann  wrote:

> Hi,
>
> it is correct that once a Flink component gets quarantined, e.g. lost
> ActorSystem message or heartbeat timeout, it will never be able to talk to
> the quarantined or quarantining system. The only solution is to restart the
> respective component. In order to do this automatically, we introduced the
> `taskmanager.exit-on-fatal-akka-error` configuration option. Since not
> all systems are able to restart components, e.g. the standalone mode, it is
> disabled by default.
>
> However, given that the component won't be reachable if it is kept alive,
> I guess you're right that we should turn this feature on by default. I'll
> apply the fix.
>
> With Flink 1.5 this should no longer be a problem, since we no longer rely
> on Akka's heartbeating. Instead we use our own heartbeats which won't
> quarantine an ActorSystem.
>
> Cheers,
> Till
>
> On Sat, Feb 24, 2018 at 9:34 PM, jelmer  wrote:
>
>> I don't think its entirely the same thing. It seems to be that by design once
>> a worker misses a heartbeat for whatever reason , be it a network hicup or
>> a long stop the world garbage collect etc etc, it gets quarantined and it
>> will not recover from that until it is restarted.
>>
>> Which is what the post by till in the thread you linked seems to indicate.
>>
>> I assumed that a system like flink would be able to recover from this and
>> that if it does not that its a bug
>>
>> Your problem seems to be that for some reason flink misses the heartbeats
>> under heavy load
>>
>> I just simulated missing a heartbeat by blocking traffic to the job
>> manager
>>
>>
>>
>>
>> On 24 February 2018 at 15:57, ashish pok  wrote:
>>
>>> We see the same in 1.4. I dont think we could see this in 1.3. I had
>>> started a thread a while back on this. Till asked for more details. I
>>> havent had a chance to get back to him on this. If you can repro this
>>> easily perhaps you can get to it faster. I will find the thread and resend.
>>>
>>> Thanks,
>>>
>>> -- Ashish
>>>
>>> On Fri, Feb 23, 2018 at 9:56 AM, jelmer
>>>  wrote:
>>> We found out there's a taskmanager.exit-on-fatal-akka-error property
>>> that will restart flink in this situation but it is not enabled by default
>>> and that feels like a rather blunt tool. I expect systems like this to be
>>> more resilient to this
>>>
>>> On 23 February 2018 at 14:42, Aljoscha Krettek 
>>> wrote:
>>>
>>> @Till Is this the expected behaviour or do you suspect something could
>>> be going wrong?
>>>
>>>
>>> On 23. Feb 2018, at 08:59, jelmer  wrote:
>>>
>>> We've observed on our flink 1.4.0 setup that if for some reason the
>>> networking between the task manager and the job manager gets disrupted then
>>> the task manager is never able to reconnect.
>>>
>>> You'll end up with messages like this getting printed to the log
>>> repeatedly
>>>
>>> Trying to register at JobManager akka.tcp://flink@jobmanager: 
>>> 6123/user/jobmanager (attempt 17, timeout: 3 milliseconds)
>>> Quarantined address [akka.tcp://flink@jobmanager: 6123] is still 
>>> unreachable or has not been restarted. Keeping it quarantined.
>>>
>>>
>>> Or alternatively
>>>
>>>
>>> Tried to associate with unreachable remote address 
>>> [akka.tcp://flink@jobmanager: 6123]. Address is now gated for 5000 ms, all 
>>> messages to this address will be delivered to dead letters. Reason: [The 
>>> remote system has quarantined this system. No further associations to the 
>>> remote system are possible until this system is restarted.
>>>
>>>
>>> But it never recovers until you either restart the job manager or the
>>> task manager
>>>
>>> I was able to successfully reproduce this behaviour in two docker
>>> containers here :
>>>
>>> https://github.com/jelmerk/ flink-worker-not-rejoining
>>> 
>>>
>>> Has anyone else seen this problem ?
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: Task manager not able to rejoin job manager after network hicup

2018-02-26 Thread Till Rohrmann
Hi,

it is correct that once a Flink component gets quarantined, e.g. lost
ActorSystem message or heartbeat timeout, it will never be able to talk to
the quarantined or quarantining system. The only solution is to restart the
respective component. In order to do this automatically, we introduced the
`taskmanager.exit-on-fatal-akka-error` configuration option. Since not all
systems are able to restart components, e.g. the standalone mode, it is
disabled by default.

However, given that the component won't be reachable if it is kept alive, I
guess you're right that we should turn this feature on by default. I'll
apply the fix.

With Flink 1.5 this should no longer be a problem, since we no longer rely
on Akka's heartbeating. Instead we use our own heartbeats which won't
quarantine an ActorSystem.

Cheers,
Till

On Sat, Feb 24, 2018 at 9:34 PM, jelmer  wrote:

> I don't think its entirely the same thing. It seems to be that by design once
> a worker misses a heartbeat for whatever reason , be it a network hicup or
> a long stop the world garbage collect etc etc, it gets quarantined and it
> will not recover from that until it is restarted.
>
> Which is what the post by till in the thread you linked seems to indicate.
>
> I assumed that a system like flink would be able to recover from this and
> that if it does not that its a bug
>
> Your problem seems to be that for some reason flink misses the heartbeats
> under heavy load
>
> I just simulated missing a heartbeat by blocking traffic to the job manager
>
>
>
>
> On 24 February 2018 at 15:57, ashish pok  wrote:
>
>> We see the same in 1.4. I dont think we could see this in 1.3. I had
>> started a thread a while back on this. Till asked for more details. I
>> havent had a chance to get back to him on this. If you can repro this
>> easily perhaps you can get to it faster. I will find the thread and resend.
>>
>> Thanks,
>>
>> -- Ashish
>>
>> On Fri, Feb 23, 2018 at 9:56 AM, jelmer
>>  wrote:
>> We found out there's a taskmanager.exit-on-fatal-akka-error property
>> that will restart flink in this situation but it is not enabled by default
>> and that feels like a rather blunt tool. I expect systems like this to be
>> more resilient to this
>>
>> On 23 February 2018 at 14:42, Aljoscha Krettek 
>> wrote:
>>
>> @Till Is this the expected behaviour or do you suspect something could be
>> going wrong?
>>
>>
>> On 23. Feb 2018, at 08:59, jelmer  wrote:
>>
>> We've observed on our flink 1.4.0 setup that if for some reason the
>> networking between the task manager and the job manager gets disrupted then
>> the task manager is never able to reconnect.
>>
>> You'll end up with messages like this getting printed to the log
>> repeatedly
>>
>> Trying to register at JobManager akka.tcp://flink@jobmanager: 
>> 6123/user/jobmanager (attempt 17, timeout: 3 milliseconds)
>> Quarantined address [akka.tcp://flink@jobmanager: 6123] is still unreachable 
>> or has not been restarted. Keeping it quarantined.
>>
>>
>> Or alternatively
>>
>>
>> Tried to associate with unreachable remote address 
>> [akka.tcp://flink@jobmanager: 6123]. Address is now gated for 5000 ms, all 
>> messages to this address will be delivered to dead letters. Reason: [The 
>> remote system has quarantined this system. No further associations to the 
>> remote system are possible until this system is restarted.
>>
>>
>> But it never recovers until you either restart the job manager or the
>> task manager
>>
>> I was able to successfully reproduce this behaviour in two docker
>> containers here :
>>
>> https://github.com/jelmerk/ flink-worker-not-rejoining
>> 
>>
>> Has anyone else seen this problem ?
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>


Re: Task manager not able to rejoin job manager after network hicup

2018-02-24 Thread jelmer
I don't think its entirely the same thing. It seems to be that by design once
a worker misses a heartbeat for whatever reason , be it a network hicup or
a long stop the world garbage collect etc etc, it gets quarantined and it
will not recover from that until it is restarted.

Which is what the post by till in the thread you linked seems to indicate.

I assumed that a system like flink would be able to recover from this and
that if it does not that its a bug

Your problem seems to be that for some reason flink misses the heartbeats
under heavy load

I just simulated missing a heartbeat by blocking traffic to the job manager




On 24 February 2018 at 15:57, ashish pok  wrote:

> We see the same in 1.4. I dont think we could see this in 1.3. I had
> started a thread a while back on this. Till asked for more details. I
> havent had a chance to get back to him on this. If you can repro this
> easily perhaps you can get to it faster. I will find the thread and resend.
>
> Thanks,
>
> -- Ashish
>
> On Fri, Feb 23, 2018 at 9:56 AM, jelmer
>  wrote:
> We found out there's a taskmanager.exit-on-fatal-akka-error property that
> will restart flink in this situation but it is not enabled by default and
> that feels like a rather blunt tool. I expect systems like this to be more
> resilient to this
>
> On 23 February 2018 at 14:42, Aljoscha Krettek 
> wrote:
>
> @Till Is this the expected behaviour or do you suspect something could be
> going wrong?
>
>
> On 23. Feb 2018, at 08:59, jelmer  wrote:
>
> We've observed on our flink 1.4.0 setup that if for some reason the
> networking between the task manager and the job manager gets disrupted then
> the task manager is never able to reconnect.
>
> You'll end up with messages like this getting printed to the log repeatedly
>
> Trying to register at JobManager akka.tcp://flink@jobmanager: 
> 6123/user/jobmanager (attempt 17, timeout: 3 milliseconds)
> Quarantined address [akka.tcp://flink@jobmanager: 6123] is still unreachable 
> or has not been restarted. Keeping it quarantined.
>
>
> Or alternatively
>
>
> Tried to associate with unreachable remote address 
> [akka.tcp://flink@jobmanager: 6123]. Address is now gated for 5000 ms, all 
> messages to this address will be delivered to dead letters. Reason: [The 
> remote system has quarantined this system. No further associations to the 
> remote system are possible until this system is restarted.
>
>
> But it never recovers until you either restart the job manager or the task
> manager
>
> I was able to successfully reproduce this behaviour in two docker
> containers here :
>
> https://github.com/jelmerk/ flink-worker-not-rejoining
> 
>
> Has anyone else seen this problem ?
>
>
>
>
>
>
>
>
>
>


Re: Task manager not able to rejoin job manager after network hicup

2018-02-24 Thread ashish pok
We see the same in 1.4. I dont think we could see this in 1.3. I had started a 
thread a while back on this. Till asked for more details. I havent had a chance 
to get back to him on this. If you can repro this easily perhaps you can get to 
it faster. I will find the thread and resend.
Thanks,

-- Ashish 
 
  On Fri, Feb 23, 2018 at 9:56 AM, jelmer wrote:   We found 
out there's a taskmanager.exit-on-fatal-akka-error property that will restart 
flink in this situation but it is not enabled by default and that feels like a 
rather blunt tool. I expect systems like this to be more resilient to this
On 23 February 2018 at 14:42, Aljoscha Krettek  wrote:

@Till Is this the expected behaviour or do you suspect something could be going 
wrong?


On 23. Feb 2018, at 08:59, jelmer  wrote:
We've observed on our flink 1.4.0 setup that if for some reason the networking 
between the task manager and the job manager gets disrupted then the task 
manager is never able to reconnect.
You'll end up with messages like this getting printed to the log repeatedly
Trying to register at JobManager akka.tcp://flink@jobmanager: 
6123/user/jobmanager (attempt 17, timeout: 3 milliseconds)
Quarantined address [akka.tcp://flink@jobmanager: 6123] is still unreachable or 
has not been restarted. Keeping it quarantined.
Or alternatively

Tried to associate with unreachable remote address 
[akka.tcp://flink@jobmanager: 6123]. Address is now gated for 5000 ms, all 
messages to this address will be delivered to dead letters. Reason: [The remote 
system has quarantined this system. No further associations to the remote 
system are possible until this system is restarted.
But it never recovers until you either restart the job manager or the task 
manager
I was able to successfully reproduce this behaviour in two docker containers 
here :
https://github.com/jelmerk/ flink-worker-not-rejoining 
Has anyone else seen this problem ?










  


Re: Task manager not able to rejoin job manager after network hicup

2018-02-23 Thread jelmer
We found out there's a taskmanager.exit-on-fatal-akka-error property that
will restart flink in this situation but it is not enabled by default and
that feels like a rather blunt tool. I expect systems like this to be more
resilient to this

On 23 February 2018 at 14:42, Aljoscha Krettek  wrote:

> @Till Is this the expected behaviour or do you suspect something could be
> going wrong?
>
>
> On 23. Feb 2018, at 08:59, jelmer  wrote:
>
> We've observed on our flink 1.4.0 setup that if for some reason the
> networking between the task manager and the job manager gets disrupted then
> the task manager is never able to reconnect.
>
> You'll end up with messages like this getting printed to the log repeatedly
>
> Trying to register at JobManager 
> akka.tcp://flink@jobmanager:6123/user/jobmanager (attempt 17, timeout: 3 
> milliseconds)
> Quarantined address [akka.tcp://flink@jobmanager:6123] is still unreachable 
> or has not been restarted. Keeping it quarantined.
>
>
> Or alternatively
>
>
> Tried to associate with unreachable remote address 
> [akka.tcp://flink@jobmanager:6123]. Address is now gated for 5000 ms, all 
> messages to this address will be delivered to dead letters. Reason: [The 
> remote system has quarantined this system. No further associations to the 
> remote system are possible until this system is restarted.
>
>
> But it never recovers until you either restart the job manager or the task
> manager
>
> I was able to successfully reproduce this behaviour in two docker
> containers here :
>
> https://github.com/jelmerk/flink-worker-not-rejoining
>
> Has anyone else seen this problem ?
>
>
>
>
>
>
>
>
>


Re: Task manager not able to rejoin job manager after network hicup

2018-02-23 Thread Aljoscha Krettek
@Till Is this the expected behaviour or do you suspect something could be going 
wrong?

> On 23. Feb 2018, at 08:59, jelmer  wrote:
> 
> We've observed on our flink 1.4.0 setup that if for some reason the 
> networking between the task manager and the job manager gets disrupted then 
> the task manager is never able to reconnect.
> 
> You'll end up with messages like this getting printed to the log repeatedly
> 
> Trying to register at JobManager 
> akka.tcp://flink@jobmanager:6123/user/jobmanager (attempt 17, timeout: 3 
> milliseconds)
> Quarantined address [akka.tcp://flink@jobmanager:6123] is still unreachable 
> or has not been restarted. Keeping it quarantined.
> 
> Or alternatively
> 
> 
> Tried to associate with unreachable remote address 
> [akka.tcp://flink@jobmanager:6123]. Address is now gated for 5000 ms, all 
> messages to this address will be delivered to dead letters. Reason: [The 
> remote system has quarantined this system. No further associations to the 
> remote system are possible until this system is restarted.
> 
> But it never recovers until you either restart the job manager or the task 
> manager
> 
> I was able to successfully reproduce this behaviour in two docker containers 
> here :
> 
> https://github.com/jelmerk/flink-worker-not-rejoining 
>  
> 
> Has anyone else seen this problem ?
> 
> 
> 
> 
> 
> 
> 



Task manager not able to rejoin job manager after network hicup

2018-02-23 Thread jelmer
We've observed on our flink 1.4.0 setup that if for some reason the
networking between the task manager and the job manager gets disrupted then
the task manager is never able to reconnect.

You'll end up with messages like this getting printed to the log repeatedly

Trying to register at JobManager
akka.tcp://flink@jobmanager:6123/user/jobmanager (attempt 17, timeout:
3 milliseconds)
Quarantined address [akka.tcp://flink@jobmanager:6123] is still
unreachable or has not been restarted. Keeping it quarantined.


Or alternatively


Tried to associate with unreachable remote address
[akka.tcp://flink@jobmanager:6123]. Address is now gated for 5000 ms,
all messages to this address will be delivered to dead letters.
Reason: [The remote system has quarantined this system. No further
associations to the remote system are possible until this system is
restarted.


But it never recovers until you either restart the job manager or the task
manager

I was able to successfully reproduce this behaviour in two docker
containers here :

https://github.com/jelmerk/flink-worker-not-rejoining

Has anyone else seen this problem ?