Yes, time is main when detecting the TM's liveness. The count method will
check by certain intervals.

Gen Luo <luogen...@gmail.com> 于2021年7月9日周五 上午10:37写道:

> @刘建刚
> Welcome to join the discuss and thanks for sharing your experience.
>
> I have a minor question. In my experience, network failures in a certain
> cluster usually takes a time to recovery, which can be measured as p99 to
> guide configuring. So I suppose it would be better to use time than attempt
> count as the configuration for confirming TM liveness. How do you think
> about this? Or is the premise right according to your experience?
>
> @Lu Niu <qqib...@gmail.com>
> > Does that mean the akka timeout situation we talked above doesn't apply
> to flink 1.11?
>
> I suppose it's true. According to the reply from Till in FLINK-23216
> <https://issues.apache.org/jira/browse/FLINK-23216>, it should be
> confirmed that the problem is introduced by declarative resource
> management, which is introduced to Flink in 1.12.
>
> In previous versions, although JM still uses heartbeat to check TMs
> status, RM will tell JM about TM lost once it is noticed by Yarn. This is
> much faster than JM's heartbeat mechanism, if one uses default heartbeat
> configurations. However, after 1.12 with declarative resource management,
> RM will no longer tell this to JM, since it doesn't have a related
> AllocationID.  So the heartbeat mechanism becomes the only way JM can know
> about TM lost.
>
> On Fri, Jul 9, 2021 at 6:34 AM Lu Niu <qqib...@gmail.com> wrote:
>
>> Thanks everyone! This is a great discussion!
>>
>> 1. Restarting takes 30s when throwing exceptions from application code
>> because the restart delay is 30s in config. Before lots of related config
>> are 30s which lead to the confusion. I redo the test with config:
>>
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
>> backoffTimeMS=1000)
>> heartbeat.timeout: 500000
>> akka.ask.timeout 30 s
>> akka.lookup.timeout 30 s
>> akka.tcp.timeout 30 s
>> akka.watch.heartbeat.interval 30 s
>> akka.watch.heartbeat.pause 120 s
>>
>>    Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole
>> restart takes 14s. Does that mean the akka timeout situation we talked
>> above doesn't apply to flink 1.11?
>>
>> 2. About flaky connection between TMs, we did notice sometimes exception
>> as follows:
>> ```
>> TaskFoo switched from RUNNING to FAILED on
>> container_e02_1599158147594_156068_01_000038 @
>> xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com
>> (dataPort=40957).
>> org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException:
>> Connection unexpectedly closed by remote task manager '
>> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'.
>> This might indicate that the remote task manager was lost.
>> at
>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81)
>> at
>> org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416)
>> at
>> org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:331)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918)
>> at
>> org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
>> at java.lang.Thread.run(Thread.java:748)
>> ```
>> 1. It's a bit inconvenient to debug such an exception because it doesn't
>> report the exact container id. Right now we have to look for `
>> xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539`
>> <http://xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539>
>> in JobMananger log to find that.
>> 2. The task manager log doesn't show anything suspicious. Also, no major
>> GC. So it might imply a flack connection in this case.
>> 3. Is there any short term workaround we can try? any config tuning?
>> Also, what's the long term solution?
>>
>> Best
>> Lu
>>
>>
>>
>>
>> On Tue, Jul 6, 2021 at 11:45 PM 刘建刚 <liujiangangp...@gmail.com> wrote:
>>
>>> It is really helpful to find the lost container quickly. In our inner
>>> flink version, we optimize it by task's report and jobmaster's probe. When
>>> a task fails because of the connection, it reports to the jobmaster. The
>>> jobmaster will try to confirm the liveness of the unconnected
>>> taskmanager for certain times by config. If the jobmaster find the
>>> taskmanager unconnected or dead, it releases the taskmanger. This will work
>>> for most cases. For an unstable environment, config needs adjustment.
>>>
>>> Gen Luo <luogen...@gmail.com> 于2021年7月6日周二 下午8:41写道:
>>>
>>>> Yes, I have noticed the PR and commented there with some consideration
>>>> about the new option. We can discuss further there.
>>>>
>>>> On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann <trohrm...@apache.org>
>>>> wrote:
>>>>
>>>> > This is actually a very good point Gen. There might not be a lot to
>>>> gain
>>>> > for us by implementing a fancy algorithm for figuring out whether a
>>>> TM is
>>>> > dead or not based on failed heartbeat RPCs from the JM if the TM <> TM
>>>> > communication does not tolerate failures and directly fails the
>>>> affected
>>>> > tasks. This assumes that the JM and TM run in the same environment.
>>>> >
>>>> > One simple approach could be to make the number of failed heartbeat
>>>> RPCs
>>>> > until a target is marked as unreachable configurable because what
>>>> > represents a good enough criterion in one user's environment might
>>>> produce
>>>> > too many false-positives in somebody else's environment. Or even
>>>> simpler,
>>>> > one could say that one can disable reacting to a failed heartbeat RPC
>>>> as it
>>>> > is currently the case.
>>>> >
>>>> > We currently have a discussion about this on this PR [1]. Maybe you
>>>> wanna
>>>> > join the discussion there and share your insights.
>>>> >
>>>> > [1] https://github.com/apache/flink/pull/16357
>>>> >
>>>> > Cheers,
>>>> > Till
>>>> >
>>>> > On Tue, Jul 6, 2021 at 4:37 AM Gen Luo <luogen...@gmail.com> wrote:
>>>> >
>>>> >> I know that there are retry strategies for akka rpc frameworks. I was
>>>> >> just considering that, since the environment is shared by JM and
>>>> TMs, and
>>>> >> the connections among TMs (using netty) are flaky in unstable
>>>> environments,
>>>> >> which will also cause the job failure, is it necessary to build a
>>>> >> strongly guaranteed connection between JM and TMs, or it could be as
>>>> flaky
>>>> >> as the connections among TMs?
>>>> >>
>>>> >> As far as I know, connections among TMs will just fail on their first
>>>> >> connection loss, so behaving like this in JM just means "as flaky as
>>>> >> connections among TMs". In a stable environment it's good enough,
>>>> but in an
>>>> >> unstable environment, it indeed increases the instability. IMO,
>>>> though a
>>>> >> single connection loss is not reliable, a double check should be good
>>>> >> enough. But since I'm not experienced with an unstable environment,
>>>> I can't
>>>> >> tell whether that's also enough for it.
>>>> >>
>>>> >> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann <trohrm...@apache.org>
>>>> >> wrote:
>>>> >>
>>>> >>> I think for RPC communication there are retry strategies used by the
>>>> >>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a
>>>> remote
>>>> >>> ActorSystem and resume communication. Moreover, there are also
>>>> >>> reconciliation protocols in place which reconcile the states
>>>> between the
>>>> >>> components because of potentially lost RPC messages. So the main
>>>> question
>>>> >>> would be whether a single connection loss is good enough for
>>>> triggering the
>>>> >>> timeout or whether we want a more elaborate mechanism to reason
>>>> about the
>>>> >>> availability of the remote system (e.g. a couple of lost heartbeat
>>>> >>> messages).
>>>> >>>
>>>> >>> Cheers,
>>>> >>> Till
>>>> >>>
>>>> >>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo <luogen...@gmail.com>
>>>> wrote:
>>>> >>>
>>>> >>>> As far as I know, a TM will report connection failure once its
>>>> >>>> connected TM is lost. I suppose JM can believe the report and fail
>>>> the
>>>> >>>> tasks in the lost TM if it also encounters a connection failure.
>>>> >>>>
>>>> >>>> Of course, it won't work if the lost TM is standalone. But I
>>>> suppose we
>>>> >>>> can use the same strategy as the connected scenario. That is,
>>>> consider it
>>>> >>>> possibly lost on the first connection loss, and fail it if double
>>>> check
>>>> >>>> also fails. The major difference is the senders of the probes are
>>>> the same
>>>> >>>> one rather than two different roles, so the results may tend to be
>>>> the same.
>>>> >>>>
>>>> >>>> On the other hand, the fact also means that the jobs can be
>>>> fragile in
>>>> >>>> an unstable environment, no matter whether the failover is
>>>> triggered by TM
>>>> >>>> or JM. So maybe it's not that worthy to introduce extra
>>>> configurations for
>>>> >>>> fault tolerance of heartbeat, unless we also introduce some retry
>>>> >>>> strategies for netty connections.
>>>> >>>>
>>>> >>>>
>>>> >>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann <trohrm...@apache.org
>>>> >
>>>> >>>> wrote:
>>>> >>>>
>>>> >>>>> Could you share the full logs with us for the second experiment,
>>>> Lu? I
>>>> >>>>> cannot tell from the top of my head why it should take 30s unless
>>>> you have
>>>> >>>>> configured a restart delay of 30s.
>>>> >>>>>
>>>> >>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen.
>>>> >>>>>
>>>> >>>>> I've now implemented FLINK-23209 [1] but it somehow has the
>>>> problem
>>>> >>>>> that in a flakey environment you might not want to mark a
>>>> TaskExecutor dead
>>>> >>>>> on the first connection loss. Maybe this is something we need to
>>>> make
>>>> >>>>> configurable (e.g. introducing a threshold which admittedly is
>>>> similar to
>>>> >>>>> the heartbeat timeout) so that the user can configure it for her
>>>> >>>>> environment. On the upside, if you mark the TaskExecutor dead on
>>>> the first
>>>> >>>>> connection loss (assuming you have a stable network environment),
>>>> then it
>>>> >>>>> can now detect lost TaskExecutors as fast as the heartbeat
>>>> interval.
>>>> >>>>>
>>>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>> >>>>>
>>>> >>>>> Cheers,
>>>> >>>>> Till
>>>> >>>>>
>>>> >>>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo <luogen...@gmail.com>
>>>> wrote:
>>>> >>>>>
>>>> >>>>>> Thanks for sharing, Till and Yang.
>>>> >>>>>>
>>>> >>>>>> @Lu
>>>> >>>>>> Sorry but I don't know how to explain the new test with the log.
>>>> >>>>>> Let's wait for others' reply.
>>>> >>>>>>
>>>> >>>>>> @Till
>>>> >>>>>> It would be nice if JIRAs could be fixed. Thanks again for
>>>> proposing
>>>> >>>>>> them.
>>>> >>>>>>
>>>> >>>>>> In addition, I was tracking an issue that RM keeps allocating and
>>>> >>>>>> freeing slots after a TM lost until its heartbeat timeout, when
>>>> I found the
>>>> >>>>>> recovery costing as long as heartbeat timeout. That should be a
>>>> minor bug
>>>> >>>>>> introduced by declarative resource management. I have created a
>>>> JIRA about
>>>> >>>>>> the problem [1] and  we can discuss it there if necessary.
>>>> >>>>>>
>>>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216
>>>> >>>>>>
>>>> >>>>>> Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道:
>>>> >>>>>>
>>>> >>>>>>> Another side question, Shall we add metric to cover the complete
>>>> >>>>>>> restarting time (phase 1 + phase 2)? Current metric
>>>> jm.restartingTime only
>>>> >>>>>>> covers phase 1. Thanks!
>>>> >>>>>>>
>>>> >>>>>>> Best
>>>> >>>>>>> Lu
>>>> >>>>>>>
>>>> >>>>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com>
>>>> wrote:
>>>> >>>>>>>
>>>> >>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick
>>>> fix!
>>>> >>>>>>>>
>>>> >>>>>>>> I did another test yesterday. In this test, I intentionally
>>>> throw
>>>> >>>>>>>> exception from the source operator:
>>>> >>>>>>>> ```
>>>> >>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>> >>>>>>>>         && errorFrenquecyInMin > 0
>>>> >>>>>>>>         && System.currentTimeMillis() - lastStartTime >=
>>>> >>>>>>>> errorFrenquecyInMin * 60 * 1000) {
>>>> >>>>>>>>       lastStartTime = System.currentTimeMillis();
>>>> >>>>>>>>       throw new RuntimeException(
>>>> >>>>>>>>           "Trigger expected exception at: " + lastStartTime);
>>>> >>>>>>>>     }
>>>> >>>>>>>> ```
>>>> >>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2
>>>> >>>>>>>> dropped to 1s (because no need for container allocation). Why
>>>> phase 1 still
>>>> >>>>>>>> takes 30s even though no TM is lost?
>>>> >>>>>>>>
>>>> >>>>>>>> Related logs:
>>>> >>>>>>>> ```
>>>> >>>>>>>> 2021-06-30 00:55:07,463 INFO
>>>> >>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>> - Source:
>>>> >>>>>>>>
>>>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ...
>>>> >>>>>>>> java.lang.RuntimeException: Trigger expected exception at:
>>>> 1625014507446
>>>> >>>>>>>> 2021-06-30 00:55:07,509 INFO
>>>> >>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>> - Job
>>>> >>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>>> >>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING
>>>> to
>>>> >>>>>>>> RESTARTING.
>>>> >>>>>>>> 2021-06-30 00:55:37,596 INFO
>>>> >>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>> - Job
>>>> >>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging
>>>> >>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state
>>>> RESTARTING to
>>>> >>>>>>>> RUNNING.
>>>> >>>>>>>> 2021-06-30 00:55:38,678 INFO
>>>> >>>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph
>>>> (time when
>>>> >>>>>>>> all tasks switch from CREATED to RUNNING)
>>>> >>>>>>>> ```
>>>> >>>>>>>> Best
>>>> >>>>>>>> Lu
>>>> >>>>>>>>
>>>> >>>>>>>>
>>>> >>>>>>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com>
>>>> wrote:
>>>> >>>>>>>>
>>>> >>>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick
>>>> fix!
>>>> >>>>>>>>>
>>>> >>>>>>>>> I did another test yesterday. In this test, I intentionally
>>>> throw
>>>> >>>>>>>>> exception from the source operator:
>>>> >>>>>>>>> ```
>>>> >>>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1
>>>> >>>>>>>>>         && errorFrenquecyInMin > 0
>>>> >>>>>>>>>         && System.currentTimeMillis() - lastStartTime >=
>>>> >>>>>>>>> errorFrenquecyInMin * 60 * 1000) {
>>>> >>>>>>>>>       lastStartTime = System.currentTimeMillis();
>>>> >>>>>>>>>       throw new RuntimeException(
>>>> >>>>>>>>>           "Trigger expected exception at: " + lastStartTime);
>>>> >>>>>>>>>     }
>>>> >>>>>>>>> ```
>>>> >>>>>>>>> In this case, I found phase 1 still takes about 30s and Phase
>>>> 2
>>>> >>>>>>>>> dropped to 1s (because no need for container allocation).
>>>> >>>>>>>>>
>>>> >>>>>>>>> Some logs:
>>>> >>>>>>>>> ```
>>>> >>>>>>>>> ```
>>>> >>>>>>>>>
>>>> >>>>>>>>>
>>>> >>>>>>>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann <
>>>> trohrm...@apache.org>
>>>> >>>>>>>>> wrote:
>>>> >>>>>>>>>
>>>> >>>>>>>>>> A quick addition, I think with FLINK-23202 it should now
>>>> also be
>>>> >>>>>>>>>> possible to improve the heartbeat mechanism in the general
>>>> case. We can
>>>> >>>>>>>>>> leverage the unreachability exception thrown if a remote
>>>> target is no
>>>> >>>>>>>>>> longer reachable to mark an heartbeat target as no longer
>>>> reachable [1].
>>>> >>>>>>>>>> This can then be considered as if the heartbeat timeout has
>>>> been triggered.
>>>> >>>>>>>>>> That way we should detect lost TaskExecutors as fast as our
>>>> heartbeat
>>>> >>>>>>>>>> interval is.
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> Cheers,
>>>> >>>>>>>>>> Till
>>>> >>>>>>>>>>
>>>> >>>>>>>>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang <
>>>> danrtsey...@gmail.com>
>>>> >>>>>>>>>> wrote:
>>>> >>>>>>>>>>
>>>> >>>>>>>>>>> Since you are deploying Flink workloads on Yarn, the Flink
>>>> >>>>>>>>>>> ResourceManager should get the container
>>>> >>>>>>>>>>> completion event after the heartbeat of Yarn NM->Yarn
>>>> RM->Flink
>>>> >>>>>>>>>>> RM, which is 8 seconds by default.
>>>> >>>>>>>>>>> And Flink ResourceManager will release the dead TaskManager
>>>> >>>>>>>>>>> container once received the completion event.
>>>> >>>>>>>>>>> As a result, Flink will not deploy tasks onto the dead
>>>> >>>>>>>>>>> TaskManagers.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> I think most of the time cost in Phase 1 might be
>>>> cancelling the
>>>> >>>>>>>>>>> tasks on the dead TaskManagers.
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> Best,
>>>> >>>>>>>>>>> Yang
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道:
>>>> >>>>>>>>>>>
>>>> >>>>>>>>>>>> The analysis of Gen is correct. Flink currently uses its
>>>> >>>>>>>>>>>> heartbeat as the primary means to detect dead
>>>> TaskManagers. This means that
>>>> >>>>>>>>>>>> Flink will take at least `heartbeat.timeout` time before
>>>> the system
>>>> >>>>>>>>>>>> recovers. Even if the cancellation happens fast (e.g. by
>>>> having configured
>>>> >>>>>>>>>>>> a low akka.ask.timeout), then Flink will still try to
>>>> deploy tasks onto the
>>>> >>>>>>>>>>>> dead TaskManager until it is marked as dead and its slots
>>>> are released
>>>> >>>>>>>>>>>> (unless the ResourceManager does not get a signal from the
>>>> underlying
>>>> >>>>>>>>>>>> resource management system that a container/pod has died).
>>>> One way to
>>>> >>>>>>>>>>>> improve the situation is to introduce logic which can
>>>> react to a
>>>> >>>>>>>>>>>> ConnectionException and then black lists or releases a
>>>> TaskManager, for
>>>> >>>>>>>>>>>> example. This is currently not implemented in Flink,
>>>> though.
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> Concerning the cancellation operation: Flink currently
>>>> does not
>>>> >>>>>>>>>>>> listen to the dead letters of Akka. This means that the
>>>> `akka.ask.timeout`
>>>> >>>>>>>>>>>> is the primary means to fail the future result of a rpc
>>>> which could not be
>>>> >>>>>>>>>>>> sent. This is also an improvement we should add to Flink's
>>>> RpcService. I've
>>>> >>>>>>>>>>>> created a JIRA issue for this problem [1].
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> Cheers,
>>>> >>>>>>>>>>>> Till
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com>
>>>> >>>>>>>>>>>> wrote:
>>>> >>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Thanks Gen! cc flink-dev to collect more inputs.
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> Best
>>>> >>>>>>>>>>>>> Lu
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo <
>>>> luogen...@gmail.com>
>>>> >>>>>>>>>>>>> wrote:
>>>> >>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> I'm also wondering here.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> In my opinion, it's because the JM can not confirm
>>>> whether
>>>> >>>>>>>>>>>>>> the TM is lost or it's a temporary network trouble and
>>>> will recover soon,
>>>> >>>>>>>>>>>>>> since I can see in the log that akka has got a
>>>> Connection refused but JM
>>>> >>>>>>>>>>>>>> still sends a heartbeat request to the lost TM until it
>>>> reaches heartbeat
>>>> >>>>>>>>>>>>>> timeout. But I'm not sure if it's indeed designed like
>>>> this.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>> I would really appreciate it if anyone who knows more
>>>> details
>>>> >>>>>>>>>>>>>> could answer. Thanks.
>>>> >>>>>>>>>>>>>>
>>>> >>>>>>>>>>>>>
>>>>
>>>

Reply via email to