Another side question, Shall we add metric to cover the complete restarting
time (phase 1 + phase 2)? Current metric jm.restartingTime only covers
phase 1. Thanks!

Best
Lu

On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote:

> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>
> I did another test yesterday. In this test, I intentionally throw
> exception from the source operator:
> ```
> if (runtimeContext.getIndexOfThisSubtask() == 1
>         && errorFrenquecyInMin > 0
>         && System.currentTimeMillis() - lastStartTime >=
> errorFrenquecyInMin * 60 * 1000) {
>       lastStartTime = System.currentTimeMillis();
>       throw new RuntimeException(
>           "Trigger expected exception at: " + lastStartTime);
>     }
> ```
> In this case, I found phase 1 still takes about 30s and Phase 2 dropped to
> 1s (because no need for container allocation). Why phase 1 still takes 30s
> even though no TM is lost?
>
> Related logs:
> ```
> 2021-06-30 00:55:07,463 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
> 2021-06-30 00:55:07,509 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
> RESTARTING.
> 2021-06-30 00:55:37,596 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
> RUNNING.
> 2021-06-30 00:55:38,678 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        (time when
> all tasks switch from CREATED to RUNNING)
> ```
> Best
> Lu
>
>
> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote:
>
>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>
>> I did another test yesterday. In this test, I intentionally throw
>> exception from the source operator:
>> ```
>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>         && errorFrenquecyInMin > 0
>>         && System.currentTimeMillis() - lastStartTime >=
>> errorFrenquecyInMin * 60 * 1000) {
>>       lastStartTime = System.currentTimeMillis();
>>       throw new RuntimeException(
>>           "Trigger expected exception at: " + lastStartTime);
>>     }
>> ```
>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>> to 1s (because no need for container allocation).
>>
>> Some logs:
>> ```
>> ```
>>
>>
>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> A quick addition, I think with FLINK-23202 it should now also be
>>> possible to improve the heartbeat mechanism in the general case. We can
>>> leverage the unreachability exception thrown if a remote target is no
>>> longer reachable to mark an heartbeat target as no longer reachable [1].
>>> This can then be considered as if the heartbeat timeout has been triggered.
>>> That way we should detect lost TaskExecutors as fast as our heartbeat
>>> interval is.
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com> wrote:
>>>
>>>> Since you are deploying Flink workloads on Yarn, the Flink
>>>> ResourceManager should get the container
>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>>> which is 8 seconds by default.
>>>> And Flink ResourceManager will release the dead TaskManager container
>>>> once received the completion event.
>>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>>>
>>>>
>>>> I think most of the time cost in Phase 1 might be cancelling the tasks
>>>> on the dead TaskManagers.
>>>>
>>>>
>>>> Best,
>>>> Yang
>>>>
>>>>
>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
>>>>
>>>>> The analysis of Gen is correct. Flink currently uses its heartbeat as
>>>>> the primary means to detect dead TaskManagers. This means that Flink will
>>>>> take at least `heartbeat.timeout` time before the system recovers. Even if
>>>>> the cancellation happens fast (e.g. by having configured a low
>>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the dead
>>>>> TaskManager until it is marked as dead and its slots are released (unless
>>>>> the ResourceManager does not get a signal from the underlying resource
>>>>> management system that a container/pod has died). One way to improve the
>>>>> situation is to introduce logic which can react to a ConnectionException
>>>>> and then black lists or releases a TaskManager, for example. This is
>>>>> currently not implemented in Flink, though.
>>>>>
>>>>> Concerning the cancellation operation: Flink currently does not listen
>>>>> to the dead letters of Akka. This means that the `akka.ask.timeout` is the
>>>>> primary means to fail the future result of a rpc which could not be sent.
>>>>> This is also an improvement we should add to Flink's RpcService. I've
>>>>> created a JIRA issue for this problem [1].
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>>>
>>>>>> Best
>>>>>> Lu
>>>>>>
>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com> wrote:
>>>>>>
>>>>>>> I'm also wondering here.
>>>>>>>
>>>>>>> In my opinion, it's because the JM can not confirm whether the TM is
>>>>>>> lost or it's a temporary network trouble and will recover soon, since I 
>>>>>>> can
>>>>>>> see in the log that akka has got a Connection refused but JM still 
>>>>>>> sends a
>>>>>>> heartbeat request to the lost TM until it reaches heartbeat timeout. But
>>>>>>> I'm not sure if it's indeed designed like this.
>>>>>>>
>>>>>>> I would really appreciate it if anyone who knows more details could
>>>>>>> answer. Thanks.
>>>>>>>
>>>>>>

Reply via email to