This is actually a very good point Gen. There might not be a lot to gain
for us by implementing a fancy algorithm for figuring out whether a TM is
dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
communication does not tolerate failures and directly fails the affected
tasks. This assumes that the JM and TM run in the same environment.

One simple approach could be to make the number of failed heartbeat RPCs
until a target is marked as unreachable configurable because what
represents a good enough criterion in one user's environment might produce
too many false-positives in somebody else's environment. Or even simpler,
one could say that one can disable reacting to a failed heartbeat RPC as it
is currently the case.

We currently have a discussion about this on this PR [1]. Maybe you wanna
join the discussion there and share your insights.

[1] https://github.com/apache/flink/pull/16357

Cheers,
Till

On Tue, Jul 6, 2021 at 4:37 AM Gen Luo <luogen...@gmail.com> wrote:

> I know that there are retry strategies for akka rpc frameworks. I was just
> considering that, since the environment is shared by JM and TMs, and the
> connections among TMs (using netty) are flaky in unstable environments,
> which will also cause the job failure, is it necessary to build a
> strongly guaranteed connection between JM and TMs, or it could be as flaky
> as the connections among TMs?
>
> As far as I know, connections among TMs will just fail on their first
> connection loss, so behaving like this in JM just means "as flaky as
> connections among TMs". In a stable environment it's good enough, but in an
> unstable environment, it indeed increases the instability. IMO, though a
> single connection loss is not reliable, a double check should be good
> enough. But since I'm not experienced with an unstable environment, I can't
> tell whether that's also enough for it.
>
> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
>> I think for RPC communication there are retry strategies used by the
>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
>> ActorSystem and resume communication. Moreover, there are also
>> reconciliation protocols in place which reconcile the states between the
>> components because of potentially lost RPC messages. So the main question
>> would be whether a single connection loss is good enough for triggering the
>> timeout or whether we want a more elaborate mechanism to reason about the
>> availability of the remote system (e.g. a couple of lost heartbeat
>> messages).
>>
>> Cheers,
>> Till
>>
>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo <luogen...@gmail.com> wrote:
>>
>>> As far as I know, a TM will report connection failure once its connected
>>> TM is lost. I suppose JM can believe the report and fail the tasks in the
>>> lost TM if it also encounters a connection failure.
>>>
>>> Of course, it won't work if the lost TM is standalone. But I suppose we
>>> can use the same strategy as the connected scenario. That is, consider it
>>> possibly lost on the first connection loss, and fail it if double check
>>> also fails. The major difference is the senders of the probes are the same
>>> one rather than two different roles, so the results may tend to be the same.
>>>
>>> On the other hand, the fact also means that the jobs can be fragile in
>>> an unstable environment, no matter whether the failover is triggered by TM
>>> or JM. So maybe it's not that worthy to introduce extra configurations for
>>> fault tolerance of heartbeat, unless we also introduce some retry
>>> strategies for netty connections.
>>>
>>>
>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>>> Could you share the full logs with us for the second experiment, Lu? I
>>>> cannot tell from the top of my head why it should take 30s unless you have
>>>> configured a restart delay of 30s.
>>>>
>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>>>
>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem
>>>> that in a flakey environment you might not want to mark a TaskExecutor dead
>>>> on the first connection loss. Maybe this is something we need to make
>>>> configurable (e.g. introducing a threshold which admittedly is similar to
>>>> the heartbeat timeout) so that the user can configure it for her
>>>> environment. On the upside, if you mark the TaskExecutor dead on the first
>>>> connection loss (assuming you have a stable network environment), then it
>>>> can now detect lost TaskExecutors as fast as the heartbeat interval.
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>>
>>>> Cheers,
>>>> Till
>>>>
>>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo <luogen...@gmail.com> wrote:
>>>>
>>>>> Thanks for sharing, Till and Yang.
>>>>>
>>>>> @Lu
>>>>> Sorry but I don't know how to explain the new test with the log. Let's
>>>>> wait for others' reply.
>>>>>
>>>>> @Till
>>>>> It would be nice if JIRAs could be fixed. Thanks again for proposing
>>>>> them.
>>>>>
>>>>> In addition, I was tracking an issue that RM keeps allocating and
>>>>> freeing slots after a TM lost until its heartbeat timeout, when I found 
>>>>> the
>>>>> recovery costing as long as heartbeat timeout. That should be a minor bug
>>>>> introduced by declarative resource management. I have created a JIRA about
>>>>> the problem [1] and  we can discuss it there if necessary.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>>>>>
>>>>> Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道:
>>>>>
>>>>>> Another side question, Shall we add metric to cover the complete
>>>>>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime 
>>>>>> only
>>>>>> covers phase 1. Thanks!
>>>>>>
>>>>>> Best
>>>>>> Lu
>>>>>>
>>>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote:
>>>>>>
>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>>>>>
>>>>>>> I did another test yesterday. In this test, I intentionally throw
>>>>>>> exception from the source operator:
>>>>>>> ```
>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>>>>>         && errorFrenquecyInMin > 0
>>>>>>>         && System.currentTimeMillis() - lastStartTime >=
>>>>>>> errorFrenquecyInMin * 60 * 1000) {
>>>>>>>       lastStartTime = System.currentTimeMillis();
>>>>>>>       throw new RuntimeException(
>>>>>>>           "Trigger expected exception at: " + lastStartTime);
>>>>>>>     }
>>>>>>> ```
>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2
>>>>>>> dropped to 1s (because no need for container allocation). Why phase 1 
>>>>>>> still
>>>>>>> takes 30s even though no TM is lost?
>>>>>>>
>>>>>>> Related logs:
>>>>>>> ```
>>>>>>> 2021-06-30 00:55:07,463 INFO
>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>>>>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>>>>>>> java.lang.RuntimeException: Trigger expected exception at: 1625014507446
>>>>>>> 2021-06-30 00:55:07,509 INFO
>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>>>>>>> RESTARTING.
>>>>>>> 2021-06-30 00:55:37,596 INFO
>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>>>>>>> RUNNING.
>>>>>>> 2021-06-30 00:55:38,678 INFO
>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        (time 
>>>>>>> when
>>>>>>> all tasks switch from CREATED to RUNNING)
>>>>>>> ```
>>>>>>> Best
>>>>>>> Lu
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>>>>>>
>>>>>>>> I did another test yesterday. In this test, I intentionally throw
>>>>>>>> exception from the source operator:
>>>>>>>> ```
>>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>>>>>>         && errorFrenquecyInMin > 0
>>>>>>>>         && System.currentTimeMillis() - lastStartTime >=
>>>>>>>> errorFrenquecyInMin * 60 * 1000) {
>>>>>>>>       lastStartTime = System.currentTimeMillis();
>>>>>>>>       throw new RuntimeException(
>>>>>>>>           "Trigger expected exception at: " + lastStartTime);
>>>>>>>>     }
>>>>>>>> ```
>>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2
>>>>>>>> dropped to 1s (because no need for container allocation).
>>>>>>>>
>>>>>>>> Some logs:
>>>>>>>> ```
>>>>>>>> ```
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> A quick addition, I think with FLINK-23202 it should now also be
>>>>>>>>> possible to improve the heartbeat mechanism in the general case. We 
>>>>>>>>> can
>>>>>>>>> leverage the unreachability exception thrown if a remote target is no
>>>>>>>>> longer reachable to mark an heartbeat target as no longer reachable 
>>>>>>>>> [1].
>>>>>>>>> This can then be considered as if the heartbeat timeout has been 
>>>>>>>>> triggered.
>>>>>>>>> That way we should detect lost TaskExecutors as fast as our heartbeat
>>>>>>>>> interval is.
>>>>>>>>>
>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Till
>>>>>>>>>
>>>>>>>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Since you are deploying Flink workloads on Yarn, the Flink
>>>>>>>>>> ResourceManager should get the container
>>>>>>>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink
>>>>>>>>>> RM, which is 8 seconds by default.
>>>>>>>>>> And Flink ResourceManager will release the dead TaskManager
>>>>>>>>>> container once received the completion event.
>>>>>>>>>> As a result, Flink will not deploy tasks onto the dead
>>>>>>>>>> TaskManagers.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I think most of the time cost in Phase 1 might be cancelling the
>>>>>>>>>> tasks on the dead TaskManagers.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Yang
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
>>>>>>>>>>
>>>>>>>>>>> The analysis of Gen is correct. Flink currently uses its
>>>>>>>>>>> heartbeat as the primary means to detect dead TaskManagers. This 
>>>>>>>>>>> means that
>>>>>>>>>>> Flink will take at least `heartbeat.timeout` time before the system
>>>>>>>>>>> recovers. Even if the cancellation happens fast (e.g. by having 
>>>>>>>>>>> configured
>>>>>>>>>>> a low akka.ask.timeout), then Flink will still try to deploy tasks 
>>>>>>>>>>> onto the
>>>>>>>>>>> dead TaskManager until it is marked as dead and its slots are 
>>>>>>>>>>> released
>>>>>>>>>>> (unless the ResourceManager does not get a signal from the 
>>>>>>>>>>> underlying
>>>>>>>>>>> resource management system that a container/pod has died). One way 
>>>>>>>>>>> to
>>>>>>>>>>> improve the situation is to introduce logic which can react to a
>>>>>>>>>>> ConnectionException and then black lists or releases a TaskManager, 
>>>>>>>>>>> for
>>>>>>>>>>> example. This is currently not implemented in Flink, though.
>>>>>>>>>>>
>>>>>>>>>>> Concerning the cancellation operation: Flink currently does not
>>>>>>>>>>> listen to the dead letters of Akka. This means that the 
>>>>>>>>>>> `akka.ask.timeout`
>>>>>>>>>>> is the primary means to fail the future result of a rpc which could 
>>>>>>>>>>> not be
>>>>>>>>>>> sent. This is also an improvement we should add to Flink's 
>>>>>>>>>>> RpcService. I've
>>>>>>>>>>> created a JIRA issue for this problem [1].
>>>>>>>>>>>
>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Till
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>>>>>>>>>
>>>>>>>>>>>> Best
>>>>>>>>>>>> Lu
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I'm also wondering here.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In my opinion, it's because the JM can not confirm whether the
>>>>>>>>>>>>> TM is lost or it's a temporary network trouble and will recover 
>>>>>>>>>>>>> soon, since
>>>>>>>>>>>>> I can see in the log that akka has got a Connection refused but 
>>>>>>>>>>>>> JM still
>>>>>>>>>>>>> sends a heartbeat request to the lost TM until it reaches 
>>>>>>>>>>>>> heartbeat
>>>>>>>>>>>>> timeout. But I'm not sure if it's indeed designed like this.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I would really appreciate it if anyone who knows more details
>>>>>>>>>>>>> could answer. Thanks.
>>>>>>>>>>>>>
>>>>>>>>>>>>

Reply via email to