It is really helpful to find the lost container quickly. In our inner flink
version, we optimize it by task's report and jobmaster's probe. When a task
fails because of the connection, it reports to the jobmaster. The jobmaster
will try to confirm the liveness of the unconnected taskmanager for certain
times by config. If the jobmaster find the taskmanager unconnected or dead,
it releases the taskmanger. This will work for most cases. For an unstable
environment, config needs adjustment.

Gen Luo <luogen...@gmail.com> 于2021年7月6日周二 下午8:41写道:

> Yes, I have noticed the PR and commented there with some consideration
> about the new option. We can discuss further there.
>
> On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann <trohrm...@apache.org> wrote:
>
> > This is actually a very good point Gen. There might not be a lot to gain
> > for us by implementing a fancy algorithm for figuring out whether a TM is
> > dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
> > communication does not tolerate failures and directly fails the affected
> > tasks. This assumes that the JM and TM run in the same environment.
> >
> > One simple approach could be to make the number of failed heartbeat RPCs
> > until a target is marked as unreachable configurable because what
> > represents a good enough criterion in one user's environment might
> produce
> > too many false-positives in somebody else's environment. Or even simpler,
> > one could say that one can disable reacting to a failed heartbeat RPC as
> it
> > is currently the case.
> >
> > We currently have a discussion about this on this PR [1]. Maybe you wanna
> > join the discussion there and share your insights.
> >
> > [1] https://github.com/apache/flink/pull/16357
> >
> > Cheers,
> > Till
> >
> > On Tue, Jul 6, 2021 at 4:37 AM Gen Luo <luogen...@gmail.com> wrote:
> >
> >> I know that there are retry strategies for akka rpc frameworks. I was
> >> just considering that, since the environment is shared by JM and TMs,
> and
> >> the connections among TMs (using netty) are flaky in unstable
> environments,
> >> which will also cause the job failure, is it necessary to build a
> >> strongly guaranteed connection between JM and TMs, or it could be as
> flaky
> >> as the connections among TMs?
> >>
> >> As far as I know, connections among TMs will just fail on their first
> >> connection loss, so behaving like this in JM just means "as flaky as
> >> connections among TMs". In a stable environment it's good enough, but
> in an
> >> unstable environment, it indeed increases the instability. IMO, though a
> >> single connection loss is not reliable, a double check should be good
> >> enough. But since I'm not experienced with an unstable environment, I
> can't
> >> tell whether that's also enough for it.
> >>
> >> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann <trohrm...@apache.org>
> >> wrote:
> >>
> >>> I think for RPC communication there are retry strategies used by the
> >>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a remote
> >>> ActorSystem and resume communication. Moreover, there are also
> >>> reconciliation protocols in place which reconcile the states between
> the
> >>> components because of potentially lost RPC messages. So the main
> question
> >>> would be whether a single connection loss is good enough for
> triggering the
> >>> timeout or whether we want a more elaborate mechanism to reason about
> the
> >>> availability of the remote system (e.g. a couple of lost heartbeat
> >>> messages).
> >>>
> >>> Cheers,
> >>> Till
> >>>
> >>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo <luogen...@gmail.com> wrote:
> >>>
> >>>> As far as I know, a TM will report connection failure once its
> >>>> connected TM is lost. I suppose JM can believe the report and fail the
> >>>> tasks in the lost TM if it also encounters a connection failure.
> >>>>
> >>>> Of course, it won't work if the lost TM is standalone. But I suppose
> we
> >>>> can use the same strategy as the connected scenario. That is,
> consider it
> >>>> possibly lost on the first connection loss, and fail it if double
> check
> >>>> also fails. The major difference is the senders of the probes are the
> same
> >>>> one rather than two different roles, so the results may tend to be
> the same.
> >>>>
> >>>> On the other hand, the fact also means that the jobs can be fragile in
> >>>> an unstable environment, no matter whether the failover is triggered
> by TM
> >>>> or JM. So maybe it's not that worthy to introduce extra
> configurations for
> >>>> fault tolerance of heartbeat, unless we also introduce some retry
> >>>> strategies for netty connections.
> >>>>
> >>>>
> >>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann <trohrm...@apache.org>
> >>>> wrote:
> >>>>
> >>>>> Could you share the full logs with us for the second experiment, Lu?
> I
> >>>>> cannot tell from the top of my head why it should take 30s unless
> you have
> >>>>> configured a restart delay of 30s.
> >>>>>
> >>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
> >>>>>
> >>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem
> >>>>> that in a flakey environment you might not want to mark a
> TaskExecutor dead
> >>>>> on the first connection loss. Maybe this is something we need to make
> >>>>> configurable (e.g. introducing a threshold which admittedly is
> similar to
> >>>>> the heartbeat timeout) so that the user can configure it for her
> >>>>> environment. On the upside, if you mark the TaskExecutor dead on the
> first
> >>>>> connection loss (assuming you have a stable network environment),
> then it
> >>>>> can now detect lost TaskExecutors as fast as the heartbeat interval.
> >>>>>
> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
> >>>>>
> >>>>> Cheers,
> >>>>> Till
> >>>>>
> >>>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo <luogen...@gmail.com> wrote:
> >>>>>
> >>>>>> Thanks for sharing, Till and Yang.
> >>>>>>
> >>>>>> @Lu
> >>>>>> Sorry but I don't know how to explain the new test with the log.
> >>>>>> Let's wait for others' reply.
> >>>>>>
> >>>>>> @Till
> >>>>>> It would be nice if JIRAs could be fixed. Thanks again for proposing
> >>>>>> them.
> >>>>>>
> >>>>>> In addition, I was tracking an issue that RM keeps allocating and
> >>>>>> freeing slots after a TM lost until its heartbeat timeout, when I
> found the
> >>>>>> recovery costing as long as heartbeat timeout. That should be a
> minor bug
> >>>>>> introduced by declarative resource management. I have created a
> JIRA about
> >>>>>> the problem [1] and  we can discuss it there if necessary.
> >>>>>>
> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216
> >>>>>>
> >>>>>> Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道:
> >>>>>>
> >>>>>>> Another side question, Shall we add metric to cover the complete
> >>>>>>> restarting time (phase 1 + phase 2)? Current metric
> jm.restartingTime only
> >>>>>>> covers phase 1. Thanks!
> >>>>>>>
> >>>>>>> Best
> >>>>>>> Lu
> >>>>>>>
> >>>>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> wrote:
> >>>>>>>
> >>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
> >>>>>>>>
> >>>>>>>> I did another test yesterday. In this test, I intentionally throw
> >>>>>>>> exception from the source operator:
> >>>>>>>> ```
> >>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
> >>>>>>>>         && errorFrenquecyInMin > 0
> >>>>>>>>         && System.currentTimeMillis() - lastStartTime >=
> >>>>>>>> errorFrenquecyInMin * 60 * 1000) {
> >>>>>>>>       lastStartTime = System.currentTimeMillis();
> >>>>>>>>       throw new RuntimeException(
> >>>>>>>>           "Trigger expected exception at: " + lastStartTime);
> >>>>>>>>     }
> >>>>>>>> ```
> >>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2
> >>>>>>>> dropped to 1s (because no need for container allocation). Why
> phase 1 still
> >>>>>>>> takes 30s even though no TM is lost?
> >>>>>>>>
> >>>>>>>> Related logs:
> >>>>>>>> ```
> >>>>>>>> 2021-06-30 00:55:07,463 INFO
> >>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Source:
> >>>>>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source
> -> ...
> >>>>>>>> java.lang.RuntimeException: Trigger expected exception at:
> 1625014507446
> >>>>>>>> 2021-06-30 00:55:07,509 INFO
> >>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Job
> >>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> >>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING to
> >>>>>>>> RESTARTING.
> >>>>>>>> 2021-06-30 00:55:37,596 INFO
> >>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> Job
> >>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
> >>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RESTARTING
> to
> >>>>>>>> RUNNING.
> >>>>>>>> 2021-06-30 00:55:38,678 INFO
> >>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
> (time when
> >>>>>>>> all tasks switch from CREATED to RUNNING)
> >>>>>>>> ```
> >>>>>>>> Best
> >>>>>>>> Lu
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix!
> >>>>>>>>>
> >>>>>>>>> I did another test yesterday. In this test, I intentionally throw
> >>>>>>>>> exception from the source operator:
> >>>>>>>>> ```
> >>>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
> >>>>>>>>>         && errorFrenquecyInMin > 0
> >>>>>>>>>         && System.currentTimeMillis() - lastStartTime >=
> >>>>>>>>> errorFrenquecyInMin * 60 * 1000) {
> >>>>>>>>>       lastStartTime = System.currentTimeMillis();
> >>>>>>>>>       throw new RuntimeException(
> >>>>>>>>>           "Trigger expected exception at: " + lastStartTime);
> >>>>>>>>>     }
> >>>>>>>>> ```
> >>>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2
> >>>>>>>>> dropped to 1s (because no need for container allocation).
> >>>>>>>>>
> >>>>>>>>> Some logs:
> >>>>>>>>> ```
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <
> trohrm...@apache.org>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>> A quick addition, I think with FLINK-23202 it should now also be
> >>>>>>>>>> possible to improve the heartbeat mechanism in the general
> case. We can
> >>>>>>>>>> leverage the unreachability exception thrown if a remote target
> is no
> >>>>>>>>>> longer reachable to mark an heartbeat target as no longer
> reachable [1].
> >>>>>>>>>> This can then be considered as if the heartbeat timeout has
> been triggered.
> >>>>>>>>>> That way we should detect lost TaskExecutors as fast as our
> heartbeat
> >>>>>>>>>> interval is.
> >>>>>>>>>>
> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Till
> >>>>>>>>>>
> >>>>>>>>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <danrtsey...@gmail.com
> >
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Since you are deploying Flink workloads on Yarn, the Flink
> >>>>>>>>>>> ResourceManager should get the container
> >>>>>>>>>>> completion event after the heartbeat of Yarn NM->Yarn RM->Flink
> >>>>>>>>>>> RM, which is 8 seconds by default.
> >>>>>>>>>>> And Flink ResourceManager will release the dead TaskManager
> >>>>>>>>>>> container once received the completion event.
> >>>>>>>>>>> As a result, Flink will not deploy tasks onto the dead
> >>>>>>>>>>> TaskManagers.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> I think most of the time cost in Phase 1 might be cancelling
> the
> >>>>>>>>>>> tasks on the dead TaskManagers.
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Yang
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> The analysis of Gen is correct. Flink currently uses its
> >>>>>>>>>>>> heartbeat as the primary means to detect dead TaskManagers.
> This means that
> >>>>>>>>>>>> Flink will take at least `heartbeat.timeout` time before the
> system
> >>>>>>>>>>>> recovers. Even if the cancellation happens fast (e.g. by
> having configured
> >>>>>>>>>>>> a low akka.ask.timeout), then Flink will still try to deploy
> tasks onto the
> >>>>>>>>>>>> dead TaskManager until it is marked as dead and its slots are
> released
> >>>>>>>>>>>> (unless the ResourceManager does not get a signal from the
> underlying
> >>>>>>>>>>>> resource management system that a container/pod has died).
> One way to
> >>>>>>>>>>>> improve the situation is to introduce logic which can react
> to a
> >>>>>>>>>>>> ConnectionException and then black lists or releases a
> TaskManager, for
> >>>>>>>>>>>> example. This is currently not implemented in Flink, though.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Concerning the cancellation operation: Flink currently does
> not
> >>>>>>>>>>>> listen to the dead letters of Akka. This means that the
> `akka.ask.timeout`
> >>>>>>>>>>>> is the primary means to fail the future result of a rpc which
> could not be
> >>>>>>>>>>>> sent. This is also an improvement we should add to Flink's
> RpcService. I've
> >>>>>>>>>>>> created a JIRA issue for this problem [1].
> >>>>>>>>>>>>
> >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
> >>>>>>>>>>>>
> >>>>>>>>>>>> Cheers,
> >>>>>>>>>>>> Till
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks Gen! cc flink-dev to collect more inputs.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best
> >>>>>>>>>>>>> Lu
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <
> luogen...@gmail.com>
> >>>>>>>>>>>>> wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>> I'm also wondering here.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> In my opinion, it's because the JM can not confirm whether
> >>>>>>>>>>>>>> the TM is lost or it's a temporary network trouble and will
> recover soon,
> >>>>>>>>>>>>>> since I can see in the log that akka has got a Connection
> refused but JM
> >>>>>>>>>>>>>> still sends a heartbeat request to the lost TM until it
> reaches heartbeat
> >>>>>>>>>>>>>> timeout. But I'm not sure if it's indeed designed like this.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I would really appreciate it if anyone who knows more
> details
> >>>>>>>>>>>>>> could answer. Thanks.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>
>

Reply via email to