Yes, I have noticed the PR and commented there with some consideration
about the new option. We can discuss further there.

On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann <trohrm...@apache.org> wrote:

> This is actually a very good point Gen. There might not be a lot to gain
> for us by implementing a fancy algorithm for figuring out whether a TM is
> dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
> communication does not tolerate failures and directly fails the affected
> tasks. This assumes that the JM and TM run in the same environment.
>
> One simple approach could be to make the number of failed heartbeat RPCs
> until a target is marked as unreachable configurable because what
> represents a good enough criterion in one user's environment might produce
> too many false-positives in somebody else's environment. Or even simpler,
> one could say that one can disable reacting to a failed heartbeat RPC as it
> is currently the case.
>
> We currently have a discussion about this on this PR [1]. Maybe you wanna
> join the discussion there and share your insights.
>
> [1] https://github.com/apache/flink/pull/16357
>
> Cheers,
> Till
>
> On Tue, Jul 6, 2021 at 4:37 AM Gen Luo <luogen...@gmail.com> wrote:
>
>> I know that there are retry strategies for akka rpc frameworks. I was
>> just considering that, since the environment is shared by JM and TMs, and
>> the connections among TMs (using netty) are flaky in unstable environments,
>> which will also cause the job failure, is it necessary to build a
>> strongly guaranteed connection between JM and TMs, or it could be as flaky
>> as the connections among TMs?
>>
>> As far as I know, connections among TMs will just fail on their first
>> connection loss, so behaving like this in JM just means "as flaky as
>> connections among TMs". In a stable environment it's good enough, but in an
>> unstable environment, it indeed increases the instability. IMO, though a
>> single connection loss is not reliable, a double check should be good
>> enough. But since I'm not experienced with an unstable environment, I can't
>> tell whether that's also enough for it.
>>
>> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> I think for RPC communication there are retry strategies used by the
>>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
>>> ActorSystem and resume communication. Moreover, there are also
>>> reconciliation protocols in place which reconcile the states between the
>>> components because of potentially lost RPC messages. So the main question
>>> would be whether a single connection loss is good enough for triggering the
>>> timeout or whether we want a more elaborate mechanism to reason about the
>>> availability of the remote system (e.g. a couple of lost heartbeat
>>> messages).
>>>
>>> Cheers,
>>> Till
>>>
>>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo <luogen...@gmail.com> wrote:
>>>
>>>> As far as I know, a TM will report connection failure once its
>>>> connected TM is lost. I suppose JM can believe the report and fail the
>>>> tasks in the lost TM if it also encounters a connection failure.
>>>>
>>>> Of course, it won't work if the lost TM is standalone. But I suppose we
>>>> can use the same strategy as the connected scenario. That is, consider it
>>>> possibly lost on the first connection loss, and fail it if double check
>>>> also fails. The major difference is the senders of the probes are the same
>>>> one rather than two different roles, so the results may tend to be the 
>>>> same.
>>>>
>>>> On the other hand, the fact also means that the jobs can be fragile in
>>>> an unstable environment, no matter whether the failover is triggered by TM
>>>> or JM. So maybe it's not that worthy to introduce extra configurations for
>>>> fault tolerance of heartbeat, unless we also introduce some retry
>>>> strategies for netty connections.
>>>>
>>>>
>>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>>> Could you share the full logs with us for the second experiment, Lu? I
>>>>> cannot tell from the top of my head why it should take 30s unless you have
>>>>> configured a restart delay of 30s.
>>>>>
>>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>>>>
>>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem
>>>>> that in a flakey environment you might not want to mark a TaskExecutor 
>>>>> dead
>>>>> on the first connection loss. Maybe this is something we need to make
>>>>> configurable (e.g. introducing a threshold which admittedly is similar to
>>>>> the heartbeat timeout) so that the user can configure it for her
>>>>> environment. On the upside, if you mark the TaskExecutor dead on the first
>>>>> connection loss (assuming you have a stable network environment), then it
>>>>> can now detect lost TaskExecutors as fast as the heartbeat interval.
>>>>>
>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>>>
>>>>> Cheers,
>>>>> Till
>>>>>
>>>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo <luogen...@gmail.com> wrote:
>>>>>
>>>>>> Thanks for sharing, Till and Yang.
>>>>>>
>>>>>> @Lu
>>>>>> Sorry but I don't know how to explain the new test with the log.
>>>>>> Let's wait for others' reply.
>>>>>>
>>>>>> @Till
>>>>>> It would be nice if JIRAs could be fixed. Thanks again for proposing
>>>>>> them.
>>>>>>
>>>>>> In addition, I was tracking an issue that RM keeps allocating and
>>>>>> freeing slots after a TM lost until its heartbeat timeout, when I found 
>>>>>> the
>>>>>> recovery costing as long as heartbeat timeout. That should be a minor bug
>>>>>> introduced by declarative resource management. I have created a JIRA 
>>>>>> about
>>>>>> the problem [1] and  we can discuss it there if necessary.
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>>>>>>
>>>>>> Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道:
>>>>>>
>>>>>>> Another side question, Shall we add metric to cover the complete
>>>>>>> restarting time (phase 1 + phase 2)? Current metric jm.restartingTime 
>>>>>>> only
>>>>>>> covers phase 1. Thanks!
>>>>>>>
>>>>>>> Best
>>>>>>> Lu
>>>>>>>
>>>>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>>>>>>
>>>>>>>> I did another test yesterday. In this test, I intentionally throw
>>>>>>>> exception from the source operator:
>>>>>>>> ```
>>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>>>>>>         && errorFrenquecyInMin > 0
>>>>>>>>         && System.currentTimeMillis() - lastStartTime >=
>>>>>>>> errorFrenquecyInMin * 60 * 1000) {
>>>>>>>>       lastStartTime = System.currentTimeMillis();
>>>>>>>>       throw new RuntimeException(
>>>>>>>>           "Trigger expected exception at: " + lastStartTime);
>>>>>>>>     }
>>>>>>>> ```
>>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2
>>>>>>>> dropped to 1s (because no need for container allocation). Why phase 1 
>>>>>>>> still
>>>>>>>> takes 30s even though no TM is lost?
>>>>>>>>
>>>>>>>> Related logs:
>>>>>>>> ```
>>>>>>>> 2021-06-30 00:55:07,463 INFO
>>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
>>>>>>>> Source:
>>>>>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> 
>>>>>>>> ...
>>>>>>>> java.lang.RuntimeException: Trigger expected exception at: 
>>>>>>>> 1625014507446
>>>>>>>> 2021-06-30 00:55:07,509 INFO
>>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
>>>>>>>> RESTARTING.
>>>>>>>> 2021-06-30 00:55:37,596 INFO
>>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING to
>>>>>>>> RUNNING.
>>>>>>>> 2021-06-30 00:55:38,678 INFO
>>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        (time 
>>>>>>>> when
>>>>>>>> all tasks switch from CREATED to RUNNING)
>>>>>>>> ```
>>>>>>>> Best
>>>>>>>> Lu
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
>>>>>>>>>
>>>>>>>>> I did another test yesterday. In this test, I intentionally throw
>>>>>>>>> exception from the source operator:
>>>>>>>>> ```
>>>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>>>>>>>         && errorFrenquecyInMin > 0
>>>>>>>>>         && System.currentTimeMillis() - lastStartTime >=
>>>>>>>>> errorFrenquecyInMin * 60 * 1000) {
>>>>>>>>>       lastStartTime = System.currentTimeMillis();
>>>>>>>>>       throw new RuntimeException(
>>>>>>>>>           "Trigger expected exception at: " + lastStartTime);
>>>>>>>>>     }
>>>>>>>>> ```
>>>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2
>>>>>>>>> dropped to 1s (because no need for container allocation).
>>>>>>>>>
>>>>>>>>> Some logs:
>>>>>>>>> ```
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <trohrm...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> A quick addition, I think with FLINK-23202 it should now also be
>>>>>>>>>> possible to improve the heartbeat mechanism in the general case. We 
>>>>>>>>>> can
>>>>>>>>>> leverage the unreachability exception thrown if a remote target is no
>>>>>>>>>> longer reachable to mark an heartbeat target as no longer reachable 
>>>>>>>>>> [1].
>>>>>>>>>> This can then be considered as if the heartbeat timeout has been 
>>>>>>>>>> triggered.
>>>>>>>>>> That way we should detect lost TaskExecutors as fast as our heartbeat
>>>>>>>>>> interval is.
>>>>>>>>>>
>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Till
>>>>>>>>>>
>>>>>>>>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Since you are deploying Flink workloads on Yarn, the Flink
>>>>>>>>>>> ResourceManager should get the container
>>>>>>>>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink
>>>>>>>>>>> RM, which is 8 seconds by default.
>>>>>>>>>>> And Flink ResourceManager will release the dead TaskManager
>>>>>>>>>>> container once received the completion event.
>>>>>>>>>>> As a result, Flink will not deploy tasks onto the dead
>>>>>>>>>>> TaskManagers.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I think most of the time cost in Phase 1 might be cancelling the
>>>>>>>>>>> tasks on the dead TaskManagers.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Yang
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
>>>>>>>>>>>
>>>>>>>>>>>> The analysis of Gen is correct. Flink currently uses its
>>>>>>>>>>>> heartbeat as the primary means to detect dead TaskManagers. This 
>>>>>>>>>>>> means that
>>>>>>>>>>>> Flink will take at least `heartbeat.timeout` time before the system
>>>>>>>>>>>> recovers. Even if the cancellation happens fast (e.g. by having 
>>>>>>>>>>>> configured
>>>>>>>>>>>> a low akka.ask.timeout), then Flink will still try to deploy tasks 
>>>>>>>>>>>> onto the
>>>>>>>>>>>> dead TaskManager until it is marked as dead and its slots are 
>>>>>>>>>>>> released
>>>>>>>>>>>> (unless the ResourceManager does not get a signal from the 
>>>>>>>>>>>> underlying
>>>>>>>>>>>> resource management system that a container/pod has died). One way 
>>>>>>>>>>>> to
>>>>>>>>>>>> improve the situation is to introduce logic which can react to a
>>>>>>>>>>>> ConnectionException and then black lists or releases a 
>>>>>>>>>>>> TaskManager, for
>>>>>>>>>>>> example. This is currently not implemented in Flink, though.
>>>>>>>>>>>>
>>>>>>>>>>>> Concerning the cancellation operation: Flink currently does not
>>>>>>>>>>>> listen to the dead letters of Akka. This means that the 
>>>>>>>>>>>> `akka.ask.timeout`
>>>>>>>>>>>> is the primary means to fail the future result of a rpc which 
>>>>>>>>>>>> could not be
>>>>>>>>>>>> sent. This is also an improvement we should add to Flink's 
>>>>>>>>>>>> RpcService. I've
>>>>>>>>>>>> created a JIRA issue for this problem [1].
>>>>>>>>>>>>
>>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>>>>>>>>>>
>>>>>>>>>>>> Cheers,
>>>>>>>>>>>> Till
>>>>>>>>>>>>
>>>>>>>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best
>>>>>>>>>>>>> Lu
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <luogen...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm also wondering here.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In my opinion, it's because the JM can not confirm whether
>>>>>>>>>>>>>> the TM is lost or it's a temporary network trouble and will 
>>>>>>>>>>>>>> recover soon,
>>>>>>>>>>>>>> since I can see in the log that akka has got a Connection 
>>>>>>>>>>>>>> refused but JM
>>>>>>>>>>>>>> still sends a heartbeat request to the lost TM until it reaches 
>>>>>>>>>>>>>> heartbeat
>>>>>>>>>>>>>> timeout. But I'm not sure if it's indeed designed like this.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I would really appreciate it if anyone who knows more details
>>>>>>>>>>>>>> could answer. Thanks.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Reply via email to