Could you share the full logs with us for the second experiment, Lu? I
cannot tell from the top of my head why it should take 30s unless you have
configured a restart delay of 30s.

Let's discuss FLINK-23216 on the JIRA ticket, Gen.

I've now implemented FLINK-23209 [1] but it somehow has the problem that in
a flakey environment you might not want to mark a TaskExecutor dead on the
first connection loss. Maybe this is something we need to make configurable
(e.g. introducing a threshold which admittedly is similar to the heartbeat
timeout) so that the user can configure it for her environment. On the
upside, if you mark the TaskExecutor dead on the first connection loss
(assuming you have a stable network environment), then it can now detect
lost TaskExecutors as fast as the heartbeat interval.

[1] https://issues.apache.org/jira/browse/FLINK-23209

Cheers,
Till

On Fri, Jul 2, 2021 at 9:33 AM Gen Luo <luogen...@gmail.com> wrote:

> Thanks for sharing, Till and Yang.
>
> @Lu
> Sorry but I don't know how to explain the new test with the log. Let's
> wait for others' reply.
>
> @Till
> It would be nice if JIRAs could be fixed. Thanks again for proposing them.
>
> In addition, I was tracking an issue that RM keeps allocating and freeing
> slots after a TM lost until its heartbeat timeout, when I found the
> recovery costing as long as heartbeat timeout. That should be a minor bug
> introduced by declarative resource management. I have created a JIRA about
> the problem [1] and  we can discuss it there if necessary.
>
> [1] https://issues.apache.org/jira/browse/FLINK-23216
>
> Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道:
>
>> Another side question, Shall we add metric to cover the complete
>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime only
>> covers phase 1. Thanks!
>>
>> Best
>> Lu
>>
>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote:
>>
>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>
>>> I did another test yesterday. In this test, I intentionally throw
>>> exception from the source operator:
>>> ```
>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>         && errorFrenquecyInMin > 0
>>>         && System.currentTimeMillis() - lastStartTime >=
>>> errorFrenquecyInMin * 60 * 1000) {
>>>       lastStartTime = System.currentTimeMillis();
>>>       throw new RuntimeException(
>>>           "Trigger expected exception at: " + lastStartTime);
>>>     }
>>> ```
>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>> to 1s (because no need for container allocation). Why phase 1 still takes
>>> 30s even though no TM is lost?
>>>
>>> Related logs:
>>> ```
>>> 2021-06-30 00:55:07,463 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>>> 2021-06-30 00:55:07,509 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>>> RESTARTING.
>>> 2021-06-30 00:55:37,596 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>>> RUNNING.
>>> 2021-06-30 00:55:38,678 INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        (time when
>>> all tasks switch from CREATED to RUNNING)
>>> ```
>>> Best
>>> Lu
>>>
>>>
>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote:
>>>
>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>>
>>>> I did another test yesterday. In this test, I intentionally throw
>>>> exception from the source operator:
>>>> ```
>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>>         && errorFrenquecyInMin > 0
>>>>         && System.currentTimeMillis() - lastStartTime >=
>>>> errorFrenquecyInMin * 60 * 1000) {
>>>>       lastStartTime = System.currentTimeMillis();
>>>>       throw new RuntimeException(
>>>>           "Trigger expected exception at: " + lastStartTime);
>>>>     }
>>>> ```
>>>> In this case, I found phase 1 still takes about 30s and Phase 2 dropped
>>>> to 1s (because no need for container allocation).
>>>>
>>>> Some logs:
>>>> ```
>>>> ```
>>>>
>>>>
>>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> A quick addition, I think with FLINK-23202 it should now also be
>>>>> possible to improve the heartbeat mechanism in the general case. We can
>>>>> leverage the unreachability exception thrown if a remote target is no
>>>>> longer reachable to mark an heartbeat target as no longer reachable [1].
>>>>> This can then be considered as if the heartbeat timeout has been 
>>>>> triggered.
>>>>> That way we should detect lost TaskExecutors as fast as our heartbeat
>>>>> interval is.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Since you are deploying Flink workloads on Yarn, the Flink
>>>>>> ResourceManager should get the container
>>>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink RM,
>>>>>> which is 8 seconds by default.
>>>>>> And Flink ResourceManager will release the dead TaskManager container
>>>>>> once received the completion event.
>>>>>> As a result, Flink will not deploy tasks onto the dead TaskManagers.
>>>>>>
>>>>>>
>>>>>> I think most of the time cost in Phase 1 might be cancelling the
>>>>>> tasks on the dead TaskManagers.
>>>>>>
>>>>>>
>>>>>> Best,
>>>>>> Yang
>>>>>>
>>>>>>
>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
>>>>>>
>>>>>>> The analysis of Gen is correct. Flink currently uses its heartbeat
>>>>>>> as the primary means to detect dead TaskManagers. This means that Flink
>>>>>>> will take at least `heartbeat.timeout` time before the system recovers.
>>>>>>> Even if the cancellation happens fast (e.g. by having configured a low
>>>>>>> akka.ask.timeout), then Flink will still try to deploy tasks onto the 
>>>>>>> dead
>>>>>>> TaskManager until it is marked as dead and its slots are released 
>>>>>>> (unless
>>>>>>> the ResourceManager does not get a signal from the underlying resource
>>>>>>> management system that a container/pod has died). One way to improve the
>>>>>>> situation is to introduce logic which can react to a ConnectionException
>>>>>>> and then black lists or releases a TaskManager, for example. This is
>>>>>>> currently not implemented in Flink, though.
>>>>>>>
>>>>>>> Concerning the cancellation operation: Flink currently does not
>>>>>>> listen to the dead letters of Akka. This means that the 
>>>>>>> `akka.ask.timeout`
>>>>>>> is the primary means to fail the future result of a rpc which could not 
>>>>>>> be
>>>>>>> sent. This is also an improvement we should add to Flink's RpcService. 
>>>>>>> I've
>>>>>>> created a JIRA issue for this problem [1].
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Till
>>>>>>>
>>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>>>>>
>>>>>>>> Best
>>>>>>>> Lu
>>>>>>>>
>>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I'm also wondering here.
>>>>>>>>>
>>>>>>>>> In my opinion, it's because the JM can not confirm whether the TM
>>>>>>>>> is lost or it's a temporary network trouble and will recover soon, 
>>>>>>>>> since
>>>>>>>>> I can see in the log that akka has got a Connection refused but JM 
>>>>>>>>> still
>>>>>>>>> sends a heartbeat request to the lost TM until it reaches heartbeat
>>>>>>>>> timeout. But I'm not sure if it's indeed designed like this.
>>>>>>>>>
>>>>>>>>> I would really appreciate it if anyone who knows more details
>>>>>>>>> could answer. Thanks.
>>>>>>>>>
>>>>>>>>

Reply via email to