Re: Flink checkpoint recovery time

2020-08-21 Thread Zhinan Cheng
Hi Till,

Thanks for the reply.
Is the timeout 10s here always necessary?
Can I reduce this value to reduce the restart time of the job?
I cannot find this term in the configuration of Flink currently.

Regards,
Zhinan


On Fri, 21 Aug 2020 at 15:28, Till Rohrmann  wrote:

> You are right. The problem is that Flink tries three times to cancel the
> call and every RPC call has a timeout of 10s. Since the machine on which
> the Task ran has died, it will take that long until the system decides to
> fail the Task instead [1].
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java#L1390
>
> Cheers,
> Till
>
> On Thu, Aug 20, 2020 at 6:17 PM Zhinan Cheng 
> wrote:
>
>> Hi Till,
>>
>> Thanks for the quick reply.
>>
>> Yes, the job actually restarts twice, the metric fullRestarts also
>> indicates this, its value is 2.
>> But the job indeed takes around 30s to switch from CANCELLING to
>> RESTARTING in its first restart.
>> I just wonder why it takes so long here?
>>
>> Also, even I set the heartbeat timeout from default 50s to 5s, this time
>> is similar, so I think this is nothing about the heartbeat timeout.
>>
>> Regards,
>> Zhinan
>>
>> On Fri, 21 Aug 2020 at 00:02, Till Rohrmann  wrote:
>>
>>> Hi Zhinan,
>>>
>>> the logs show that the cancellation does not take 30s. What happens is
>>> that the job gets restarted a couple of times. The problem seems to be that
>>> one TaskManager died permanently but it takes the heartbeat timeout
>>> (default 50s) until it is detected as dead. In the meantime the system
>>> tries to redeploy tasks which will cause the job to fail again and again.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Aug 20, 2020 at 4:41 PM Zhinan Cheng 
>>> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Sorry for the late reply.
>>>> Attached is the log of jobmanager.
>>>> I notice that during canceling the job, the jobmanager also warns that
>>>> the connections to the failed taskmanager is lost.
>>>> And this lasts for about 30s, and then the jobmanager
>>>> successfully cancels the operator instances that related to the
>>>> failed taskmanager and restarts the job.
>>>> Does there anyway help reduce the restart time?
>>>>
>>>> Thanks a lot.
>>>>
>>>> Regards,
>>>> Zhinan
>>>>
>>>> On Wed, 19 Aug 2020 at 16:37, Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Could you share the logs with us? This might help to explain why the
>>>>> cancellation takes so long. Flink is no longer using Akka's death watch
>>>>> mechanism.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Aug 19, 2020 at 10:01 AM Zhinan Cheng 
>>>>> wrote:
>>>>>
>>>>>> Hi Till,
>>>>>>
>>>>>> Thanks for the quick response.
>>>>>>
>>>>>> > for i) the cancellation depends on the user code. If the user code
>>>>>> does a blocking operation, Flink needs to wait until it returns from 
>>>>>> there
>>>>>> before it can move the Task's state to CANCELED.
>>>>>> for this, my code just includes a map operation and then aggregates
>>>>>> the results into a tumbling window. So I think in this case the time is 
>>>>>> not
>>>>>> attributed to the code.
>>>>>> I looked into the log, during the period, I observed that the
>>>>>> jobmanager continues warning that its connection to the failed
>>>>>> taskmanager is confused.
>>>>>> I am not sure if this is the reason that delays the canceling, do you
>>>>>> have any ideas about this?
>>>>>>
>>>>>> I am also looking the deadthwatch mechanism [1] of Akka to see if
>>>>>> this is the reason.
>>>>>>
>>>>>> For (ii), I will open the JIRA issue for your mention.
>>>>>>
>>>>>> Thanks.
>>>>>>
>>>>>>
>>>>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors
>>>>>>
>>>>>> Regards.
>>>>>> Zhinan
>>>>>>
>>>>>> On W

Re: Decompose failure recovery time

2020-08-20 Thread Zhinan Cheng
Hi Piotr,

Thanks a lot.
I will try your suggestion to see what happen.

Regards,
Zhinan

On Fri, 21 Aug 2020 at 00:40, Piotr Nowojski  wrote:
>
> Hi Zhinan,
>
> It's hard to say, but my guess it takes that long for the tasks to respond to 
> cancellation which consists of a couple of steps. If a task is currently busy 
> processing something, it has to respond to interruption 
> (`java.lang.Thread#interrupt`). If it takes 30 seconds for a task to react to 
> the interruption and clean up it's resources, that can cause problems and 
> there is very little that Flink can do.
>
> If you want to debug it further, I would suggest collecting stack traces 
> during cancellation (or even better: profile the code during cancellation). 
> This would help you answer the question, what are the task threads busy with.
>
> Probably not a solution, but I'm mentioning it just in case, you can shorten 
> the `task.cancellation.timeout` period.  By default it's 180s. After that, 
> whole TaskManager will be killed. If you have spare TaskManagers or you can 
> restart them very quickly, lowering this timeout might help to some extent 
> (in an exchange for dirty shutdown, without cleaning up the resources).
>
> Piotrek
>
> czw., 20 sie 2020 o 18:00 Zhinan Cheng  napisał(a):
>>
>> Hi Piotr,
>>
>> Thanks a lot for your help.
>> Yes, I finally realize that I can only approximate the time for [1]
>> and [3] and measure [2] by monitoring the uptime and downtime metric
>> provided by Flink.
>>
>> And now my problem is that I found the time in [2] can be up to 40s, I
>> wonder why it takes so long to restart the job.
>> The log actually shows that the time to switch all operator instances
>> from CANCELING to CANCELED is around 30s, do you have any ideas about
>> this?
>>
>> Many thanks.
>>
>> Regards,
>> Zhinan
>>
>> On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
>> >
>> > Hi,
>> >
>> > > I want to decompose the recovery time into different parts, say
>> > > (1) the time to detect the failure,
>> > > (2) the time to restart the job,
>> > > (3) and the time to restore the checkpointing.
>> >
>> > 1. Maybe I'm missing something, but as far as I can tell, Flink can not
>> > help you with that. Time to detect the failure, would be a time between the
>> > failure occurred, and the time when JobManager realises about this failure.
>> > If we could reliably measure/check when the first one happened, then we
>> > could immediately trigger failover. You are interested in this exactly
>> > because there is no reliable way to detect the failure immediately. You
>> > could approximate this via analysing the logs.
>> >
>> > 2. Maybe there are some metrics that you could use, if not you check use
>> > the REST API [1] to monitor for the job status. Again you could also do it
>> > via analysing the logs.
>> >
>> > 3. In the future this might be measurable using the REST API (similar as
>> > the point 2.), but currently there is no way to do it that way. There is a
>> > ticket for that [2]. I think currently the only way is to do it is via
>> > analysing the logs.
>> >
>> > If you just need to do this once, I would analyse the logs manually. If you
>> > want to do it many times or monitor this continuously, I would write some
>> > simple script (python?) to mix checking REST API calls for 2. with logs
>> > analysing.
>> >
>> > Piotrek
>> >
>> >
>> > [1]
>> > https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs
>> > [2] https://issues.apache.org/jira/browse/FLINK-17012
>> > wt., 18 sie 2020 o 04:07 Zhinan Cheng  napisał(a):
>> >
>> > > Hi all,
>> > >
>> > > I am working on measuring the failure recovery time of Flink and I
>> > > want to decompose the recovery time into different parts, say the time
>> > > to detect the failure, the time to restart the job, and the time to
>> > > restore the checkpointing.
>> > >
>> > > Unfortunately, I cannot find  any information in Flink doc to solve
>> > > this, Is there any way that Flink has provided for this, otherwise,
>> > > how can I solve this?
>> > >
>> > > Thanks a lot for your help.
>> > >
>> > > Regards,
>> > > Juno
>> > >
>>
>> On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
>> >
>> > Hi,
>> >
&

Re: Flink checkpoint recovery time

2020-08-20 Thread Zhinan Cheng
Hi Till,

Thanks for the quick reply.

Yes, the job actually restarts twice, the metric fullRestarts also
indicates this, its value is 2.
But the job indeed takes around 30s to switch from CANCELLING to RESTARTING
in its first restart.
I just wonder why it takes so long here?

Also, even I set the heartbeat timeout from default 50s to 5s, this time is
similar, so I think this is nothing about the heartbeat timeout.

Regards,
Zhinan

On Fri, 21 Aug 2020 at 00:02, Till Rohrmann  wrote:

> Hi Zhinan,
>
> the logs show that the cancellation does not take 30s. What happens is
> that the job gets restarted a couple of times. The problem seems to be that
> one TaskManager died permanently but it takes the heartbeat timeout
> (default 50s) until it is detected as dead. In the meantime the system
> tries to redeploy tasks which will cause the job to fail again and again.
>
> Cheers,
> Till
>
> On Thu, Aug 20, 2020 at 4:41 PM Zhinan Cheng 
> wrote:
>
>> Hi Till,
>>
>> Sorry for the late reply.
>> Attached is the log of jobmanager.
>> I notice that during canceling the job, the jobmanager also warns that
>> the connections to the failed taskmanager is lost.
>> And this lasts for about 30s, and then the jobmanager
>> successfully cancels the operator instances that related to the
>> failed taskmanager and restarts the job.
>> Does there anyway help reduce the restart time?
>>
>> Thanks a lot.
>>
>> Regards,
>> Zhinan
>>
>> On Wed, 19 Aug 2020 at 16:37, Till Rohrmann  wrote:
>>
>>> Could you share the logs with us? This might help to explain why the
>>> cancellation takes so long. Flink is no longer using Akka's death watch
>>> mechanism.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 19, 2020 at 10:01 AM Zhinan Cheng 
>>> wrote:
>>>
>>>> Hi Till,
>>>>
>>>> Thanks for the quick response.
>>>>
>>>> > for i) the cancellation depends on the user code. If the user code
>>>> does a blocking operation, Flink needs to wait until it returns from there
>>>> before it can move the Task's state to CANCELED.
>>>> for this, my code just includes a map operation and then aggregates the
>>>> results into a tumbling window. So I think in this case the time is not
>>>> attributed to the code.
>>>> I looked into the log, during the period, I observed that the
>>>> jobmanager continues warning that its connection to the failed
>>>> taskmanager is confused.
>>>> I am not sure if this is the reason that delays the canceling, do you
>>>> have any ideas about this?
>>>>
>>>> I am also looking the deadthwatch mechanism [1] of Akka to see if this
>>>> is the reason.
>>>>
>>>> For (ii), I will open the JIRA issue for your mention.
>>>>
>>>> Thanks.
>>>>
>>>>
>>>> [1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors
>>>>
>>>> Regards.
>>>> Zhinan
>>>>
>>>> On Wed, 19 Aug 2020 at 15:39, Till Rohrmann 
>>>> wrote:
>>>>
>>>>> Hi Zhinan,
>>>>>
>>>>> for i) the cancellation depends on the user code. If the user code
>>>>> does a blocking operation, Flink needs to wait until it returns from there
>>>>> before it can move the Task's state to CANCELED.
>>>>>
>>>>> for ii) I think your observation is correct. Could you please open a
>>>>> JIRA issue for this problem so that it can be fixed in Flink? Thanks a 
>>>>> lot!
>>>>>
>>>>> For the time to restore the checkpoints it could also be interesting
>>>>> to add a proper metric to Flink. Hence, you could also create a JIRA issue
>>>>> for it.
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng 
>>>>> wrote:
>>>>>
>>>>>> Hi Yun,
>>>>>>
>>>>>> Thanks a lot for your help. Seems hard to measure the checkpointing
>>>>>> restore time currently.
>>>>>> I do monitor the "fullRestarts" metric and others like "uptime" and
>>>>>> "downtime" to observe some information about failure recovery.
>>>>>>
>>>>>> Still some confusions:
>>>>>> i) I found the 

Re: Decompose failure recovery time

2020-08-20 Thread Zhinan Cheng
Hi Piotr,

Thanks a lot for your help.
Yes, I finally realize that I can only approximate the time for [1]
and [3] and measure [2] by monitoring the uptime and downtime metric
provided by Flink.

And now my problem is that I found the time in [2] can be up to 40s, I
wonder why it takes so long to restart the job.
The log actually shows that the time to switch all operator instances
from CANCELING to CANCELED is around 30s, do you have any ideas about
this?

Many thanks.

Regards,
Zhinan

On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
>
> Hi,
>
> > I want to decompose the recovery time into different parts, say
> > (1) the time to detect the failure,
> > (2) the time to restart the job,
> > (3) and the time to restore the checkpointing.
>
> 1. Maybe I'm missing something, but as far as I can tell, Flink can not
> help you with that. Time to detect the failure, would be a time between the
> failure occurred, and the time when JobManager realises about this failure.
> If we could reliably measure/check when the first one happened, then we
> could immediately trigger failover. You are interested in this exactly
> because there is no reliable way to detect the failure immediately. You
> could approximate this via analysing the logs.
>
> 2. Maybe there are some metrics that you could use, if not you check use
> the REST API [1] to monitor for the job status. Again you could also do it
> via analysing the logs.
>
> 3. In the future this might be measurable using the REST API (similar as
> the point 2.), but currently there is no way to do it that way. There is a
> ticket for that [2]. I think currently the only way is to do it is via
> analysing the logs.
>
> If you just need to do this once, I would analyse the logs manually. If you
> want to do it many times or monitor this continuously, I would write some
> simple script (python?) to mix checking REST API calls for 2. with logs
> analysing.
>
> Piotrek
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs
> [2] https://issues.apache.org/jira/browse/FLINK-17012
> wt., 18 sie 2020 o 04:07 Zhinan Cheng  napisał(a):
>
> > Hi all,
> >
> > I am working on measuring the failure recovery time of Flink and I
> > want to decompose the recovery time into different parts, say the time
> > to detect the failure, the time to restart the job, and the time to
> > restore the checkpointing.
> >
> > Unfortunately, I cannot find  any information in Flink doc to solve
> > this, Is there any way that Flink has provided for this, otherwise,
> > how can I solve this?
> >
> > Thanks a lot for your help.
> >
> > Regards,
> > Juno
> >

On Thu, 20 Aug 2020 at 21:26, Piotr Nowojski  wrote:
>
> Hi,
>
> > I want to decompose the recovery time into different parts, say
> > (1) the time to detect the failure,
> > (2) the time to restart the job,
> > (3) and the time to restore the checkpointing.
>
> 1. Maybe I'm missing something, but as far as I can tell, Flink can not
> help you with that. Time to detect the failure, would be a time between the
> failure occurred, and the time when JobManager realises about this failure.
> If we could reliably measure/check when the first one happened, then we
> could immediately trigger failover. You are interested in this exactly
> because there is no reliable way to detect the failure immediately. You
> could approximate this via analysing the logs.
>
> 2. Maybe there are some metrics that you could use, if not you check use
> the REST API [1] to monitor for the job status. Again you could also do it
> via analysing the logs.
>
> 3. In the future this might be measurable using the REST API (similar as
> the point 2.), but currently there is no way to do it that way. There is a
> ticket for that [2]. I think currently the only way is to do it is via
> analysing the logs.
>
> If you just need to do this once, I would analyse the logs manually. If you
> want to do it many times or monitor this continuously, I would write some
> simple script (python?) to mix checking REST API calls for 2. with logs
> analysing.
>
> Piotrek
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/rest_api.html#jobs
> [2] https://issues.apache.org/jira/browse/FLINK-17012
> wt., 18 sie 2020 o 04:07 Zhinan Cheng  napisał(a):
>
> > Hi all,
> >
> > I am working on measuring the failure recovery time of Flink and I
> > want to decompose the recovery time into different parts, say the time
> > to detect the failure, the time to restart the job, and the time to
> > restore the checkpointing.
> >
> > Unfortunately, I cannot find  any information in Flink doc to solve
> > this, Is there any way that Flink has provided for this, otherwise,
> > how can I solve this?
> >
> > Thanks a lot for your help.
> >
> > Regards,
> > Juno
> >


Re: Flink checkpoint recovery time

2020-08-19 Thread Zhinan Cheng
Hi Till,

Thanks for the quick response.

> for i) the cancellation depends on the user code. If the user code does a
blocking operation, Flink needs to wait until it returns from there before
it can move the Task's state to CANCELED.
for this, my code just includes a map operation and then aggregates the
results into a tumbling window. So I think in this case the time is not
attributed to the code.
I looked into the log, during the period, I observed that the jobmanager
continues warning that its connection to the failed taskmanager is confused.
I am not sure if this is the reason that delays the canceling, do you have
any ideas about this?

I am also looking the deadthwatch mechanism [1] of Akka to see if this is
the reason.

For (ii), I will open the JIRA issue for your mention.

Thanks.


[1] https://cwiki.apache.org/confluence/display/FLINK/Akka+and+Actors

Regards.
Zhinan

On Wed, 19 Aug 2020 at 15:39, Till Rohrmann  wrote:

> Hi Zhinan,
>
> for i) the cancellation depends on the user code. If the user code does a
> blocking operation, Flink needs to wait until it returns from there before
> it can move the Task's state to CANCELED.
>
> for ii) I think your observation is correct. Could you please open a JIRA
> issue for this problem so that it can be fixed in Flink? Thanks a lot!
>
> For the time to restore the checkpoints it could also be interesting to
> add a proper metric to Flink. Hence, you could also create a JIRA issue for
> it.
>
> Cheers,
> Till
>
> On Wed, Aug 19, 2020 at 8:43 AM Zhinan Cheng 
> wrote:
>
>> Hi Yun,
>>
>> Thanks a lot for your help. Seems hard to measure the checkpointing
>> restore time currently.
>> I do monitor the "fullRestarts" metric and others like "uptime" and
>> "downtime" to observe some information about failure recovery.
>>
>> Still some confusions:
>> i) I found the time for the jobmanager to make the job from status
>> CANCELING to status CANCELED up to 30s?
>>  Is there any reason why it takes so long? Can I reduce this time?
>> ii) Currently the way to calculate the "downtime"  is not consistent with
>> the description in the doc, now the downtime is actually the current
>> timestamp minus the time timestamp when the job started.
>> But I think the doc obviously only want to measure the current
>> timestamp minus the timestamp when the job failed.
>>
>> I think I need to measure these times by adding specified metrics myself.
>>
>> Regards,
>> Zhinan
>>
>>
>>
>>
>> On Wed, 19 Aug 2020 at 01:45, Yun Tang  wrote:
>>
>>> Hi Zhinan,
>>>
>>> For the time to detect the failure, you could refer to the time when
>>> 'fullRestarts' increase. That could give you information about the time of
>>> job failure.
>>>
>>> For the checkpoint recovery time, there actually exist two parts:
>>>
>>>1. The time to read checkpoint meta in JM. However, this duration of
>>>time has no explicit metrics currently as that part of duration would be
>>>nearly just reading 1 MB file remotely from DFS.
>>>2. The time for tasks to restore state. This should be treated as
>>>the real time for checkpoint recovery and could even be 10 minutes+ when
>>>restoring savepoint. Unfortunately, this part of time is also not 
>>> recorded
>>>in metrics now.
>>>If you find the task is in RUNNING state but not consume any record,
>>>that might be stuck in restoring checkpoint/savepoint.
>>>
>>>
>>> Best
>>> Yun Tang
>>> --
>>> *From:* Zhinan Cheng 
>>> *Sent:* Tuesday, August 18, 2020 11:50
>>> *To:* user@flink.apache.org 
>>> *Subject:* Flink checkpoint recovery time
>>>
>>> Hi all,
>>>
>>> I am working on measuring the failure recovery time of Flink and I want
>>> to decompose the recovery time into different parts, say the time to detect
>>> the failure, the time to restart the job, and the time to
>>> restore the checkpointing.
>>>
>>> I found that I can measure the down time during failure and the time to
>>> restart the job and some metric for the checkpointing as below.
>>>
>>> [image: measure.png]
>>> Unfortunately, I cannot find any information about the failure detect
>>> time and checkpoint recovery time, Is there any way that Flink has provided
>>> for this, otherwise, how can I solve this?
>>>
>>> Thanks a lot for your help.
>>>
>>> Regards,
>>>
>>


Re: Flink checkpoint recovery time

2020-08-19 Thread Zhinan Cheng
Hi Yun,

Thanks a lot for your help. Seems hard to measure the checkpointing restore
time currently.
I do monitor the "fullRestarts" metric and others like "uptime" and
"downtime" to observe some information about failure recovery.

Still some confusions:
i) I found the time for the jobmanager to make the job from status
CANCELING to status CANCELED up to 30s?
 Is there any reason why it takes so long? Can I reduce this time?
ii) Currently the way to calculate the "downtime"  is not consistent with
the description in the doc, now the downtime is actually the current
timestamp minus the time timestamp when the job started.
But I think the doc obviously only want to measure the current
timestamp minus the timestamp when the job failed.

I think I need to measure these times by adding specified metrics myself.

Regards,
Zhinan




On Wed, 19 Aug 2020 at 01:45, Yun Tang  wrote:

> Hi Zhinan,
>
> For the time to detect the failure, you could refer to the time when
> 'fullRestarts' increase. That could give you information about the time of
> job failure.
>
> For the checkpoint recovery time, there actually exist two parts:
>
>1. The time to read checkpoint meta in JM. However, this duration of
>time has no explicit metrics currently as that part of duration would be
>nearly just reading 1 MB file remotely from DFS.
>2. The time for tasks to restore state. This should be treated as the
>real time for checkpoint recovery and could even be 10 minutes+ when
>restoring savepoint. Unfortunately, this part of time is also not recorded
>in metrics now.
>If you find the task is in RUNNING state but not consume any record,
>that might be stuck in restoring checkpoint/savepoint.
>
>
> Best
> Yun Tang
> --
> *From:* Zhinan Cheng 
> *Sent:* Tuesday, August 18, 2020 11:50
> *To:* user@flink.apache.org 
> *Subject:* Flink checkpoint recovery time
>
> Hi all,
>
> I am working on measuring the failure recovery time of Flink and I want to
> decompose the recovery time into different parts, say the time to detect
> the failure, the time to restart the job, and the time to
> restore the checkpointing.
>
> I found that I can measure the down time during failure and the time to
> restart the job and some metric for the checkpointing as below.
>
> [image: measure.png]
> Unfortunately, I cannot find any information about the failure detect time
> and checkpoint recovery time, Is there any way that Flink has provided for
> this, otherwise, how can I solve this?
>
> Thanks a lot for your help.
>
> Regards,
>


Flink checkpoint recovery time

2020-08-17 Thread Zhinan Cheng
Hi all,

I am working on measuring the failure recovery time of Flink and I want to
decompose the recovery time into different parts, say the time to detect
the failure, the time to restart the job, and the time to
restore the checkpointing.

I found that I can measure the down time during failure and the time to
restart the job and some metric for the checkpointing as below.

[image: measure.png]
Unfortunately, I cannot find any information about the failure detect time
and checkpoint recovery time, Is there any way that Flink has provided for
this, otherwise, how can I solve this?

Thanks a lot for your help.

Regards,