@刘建刚 Welcome to join the discuss and thanks for sharing your experience.
I have a minor question. In my experience, network failures in a certain cluster usually takes a time to recovery, which can be measured as p99 to guide configuring. So I suppose it would be better to use time than attempt count as the configuration for confirming TM liveness. How do you think about this? Or is the premise right according to your experience? @Lu Niu <qqib...@gmail.com> > Does that mean the akka timeout situation we talked above doesn't apply to flink 1.11? I suppose it's true. According to the reply from Till in FLINK-23216 <https://issues.apache.org/jira/browse/FLINK-23216>, it should be confirmed that the problem is introduced by declarative resource management, which is introduced to Flink in 1.12. In previous versions, although JM still uses heartbeat to check TMs status, RM will tell JM about TM lost once it is noticed by Yarn. This is much faster than JM's heartbeat mechanism, if one uses default heartbeat configurations. However, after 1.12 with declarative resource management, RM will no longer tell this to JM, since it doesn't have a related AllocationID. So the heartbeat mechanism becomes the only way JM can know about TM lost. On Fri, Jul 9, 2021 at 6:34 AM Lu Niu <qqib...@gmail.com> wrote: > Thanks everyone! This is a great discussion! > > 1. Restarting takes 30s when throwing exceptions from application code > because the restart delay is 30s in config. Before lots of related config > are 30s which lead to the confusion. I redo the test with config: > > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, > backoffTimeMS=1000) > heartbeat.timeout: 500000 > akka.ask.timeout 30 s > akka.lookup.timeout 30 s > akka.tcp.timeout 30 s > akka.watch.heartbeat.interval 30 s > akka.watch.heartbeat.pause 120 s > > Now Phase 1 drops down to 2s now and phase 2 takes 13s. The whole > restart takes 14s. Does that mean the akka timeout situation we talked > above doesn't apply to flink 1.11? > > 2. About flaky connection between TMs, we did notice sometimes exception > as follows: > ``` > TaskFoo switched from RUNNING to FAILED on > container_e02_1599158147594_156068_01_000038 @ > xenon-prod-001-20200316-data-slave-prod-0a0201a4.ec2.pin220.com > (dataPort=40957). > org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException: > Connection unexpectedly closed by remote task manager ' > xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539'. > This might indicate that the remote task manager was lost. > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.channelInactive(CreditBasedPartitionRequestClientHandler.java:144) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) > at > org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) > at > org.apache.flink.runtime.io.network.netty.NettyMessageClientDecoderDelegate.channelInactive(NettyMessageClientDecoderDelegate.java:97) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:236) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1416) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:257) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:243) > at > org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:912) > at > org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:816) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:163) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:416) > at > org.apache.flink.shaded.netty4.io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:331) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:918) > at > org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) > at java.lang.Thread.run(Thread.java:748) > ``` > 1. It's a bit inconvenient to debug such an exception because it doesn't > report the exact container id. Right now we have to look for ` > xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539` > <http://xenon-prod-001-20200316-data-slave-prod-0a020f7a.ec2.pin220.com/10.2.15.122:33539> > in JobMananger log to find that. > 2. The task manager log doesn't show anything suspicious. Also, no major > GC. So it might imply a flack connection in this case. > 3. Is there any short term workaround we can try? any config tuning? Also, > what's the long term solution? > > Best > Lu > > > > > On Tue, Jul 6, 2021 at 11:45 PM 刘建刚 <liujiangangp...@gmail.com> wrote: > >> It is really helpful to find the lost container quickly. In our inner >> flink version, we optimize it by task's report and jobmaster's probe. When >> a task fails because of the connection, it reports to the jobmaster. The >> jobmaster will try to confirm the liveness of the unconnected >> taskmanager for certain times by config. If the jobmaster find the >> taskmanager unconnected or dead, it releases the taskmanger. This will work >> for most cases. For an unstable environment, config needs adjustment. >> >> Gen Luo <luogen...@gmail.com> 于2021年7月6日周二 下午8:41写道: >> >>> Yes, I have noticed the PR and commented there with some consideration >>> about the new option. We can discuss further there. >>> >>> On Tue, Jul 6, 2021 at 6:04 PM Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>> > This is actually a very good point Gen. There might not be a lot to >>> gain >>> > for us by implementing a fancy algorithm for figuring out whether a TM >>> is >>> > dead or not based on failed heartbeat RPCs from the JM if the TM <> TM >>> > communication does not tolerate failures and directly fails the >>> affected >>> > tasks. This assumes that the JM and TM run in the same environment. >>> > >>> > One simple approach could be to make the number of failed heartbeat >>> RPCs >>> > until a target is marked as unreachable configurable because what >>> > represents a good enough criterion in one user's environment might >>> produce >>> > too many false-positives in somebody else's environment. Or even >>> simpler, >>> > one could say that one can disable reacting to a failed heartbeat RPC >>> as it >>> > is currently the case. >>> > >>> > We currently have a discussion about this on this PR [1]. Maybe you >>> wanna >>> > join the discussion there and share your insights. >>> > >>> > [1] https://github.com/apache/flink/pull/16357 >>> > >>> > Cheers, >>> > Till >>> > >>> > On Tue, Jul 6, 2021 at 4:37 AM Gen Luo <luogen...@gmail.com> wrote: >>> > >>> >> I know that there are retry strategies for akka rpc frameworks. I was >>> >> just considering that, since the environment is shared by JM and TMs, >>> and >>> >> the connections among TMs (using netty) are flaky in unstable >>> environments, >>> >> which will also cause the job failure, is it necessary to build a >>> >> strongly guaranteed connection between JM and TMs, or it could be as >>> flaky >>> >> as the connections among TMs? >>> >> >>> >> As far as I know, connections among TMs will just fail on their first >>> >> connection loss, so behaving like this in JM just means "as flaky as >>> >> connections among TMs". In a stable environment it's good enough, but >>> in an >>> >> unstable environment, it indeed increases the instability. IMO, >>> though a >>> >> single connection loss is not reliable, a double check should be good >>> >> enough. But since I'm not experienced with an unstable environment, I >>> can't >>> >> tell whether that's also enough for it. >>> >> >>> >> On Mon, Jul 5, 2021 at 5:59 PM Till Rohrmann <trohrm...@apache.org> >>> >> wrote: >>> >> >>> >>> I think for RPC communication there are retry strategies used by the >>> >>> underlying Akka ActorSystem. So a RpcEndpoint can reconnect to a >>> remote >>> >>> ActorSystem and resume communication. Moreover, there are also >>> >>> reconciliation protocols in place which reconcile the states between >>> the >>> >>> components because of potentially lost RPC messages. So the main >>> question >>> >>> would be whether a single connection loss is good enough for >>> triggering the >>> >>> timeout or whether we want a more elaborate mechanism to reason >>> about the >>> >>> availability of the remote system (e.g. a couple of lost heartbeat >>> >>> messages). >>> >>> >>> >>> Cheers, >>> >>> Till >>> >>> >>> >>> On Mon, Jul 5, 2021 at 10:00 AM Gen Luo <luogen...@gmail.com> wrote: >>> >>> >>> >>>> As far as I know, a TM will report connection failure once its >>> >>>> connected TM is lost. I suppose JM can believe the report and fail >>> the >>> >>>> tasks in the lost TM if it also encounters a connection failure. >>> >>>> >>> >>>> Of course, it won't work if the lost TM is standalone. But I >>> suppose we >>> >>>> can use the same strategy as the connected scenario. That is, >>> consider it >>> >>>> possibly lost on the first connection loss, and fail it if double >>> check >>> >>>> also fails. The major difference is the senders of the probes are >>> the same >>> >>>> one rather than two different roles, so the results may tend to be >>> the same. >>> >>>> >>> >>>> On the other hand, the fact also means that the jobs can be fragile >>> in >>> >>>> an unstable environment, no matter whether the failover is >>> triggered by TM >>> >>>> or JM. So maybe it's not that worthy to introduce extra >>> configurations for >>> >>>> fault tolerance of heartbeat, unless we also introduce some retry >>> >>>> strategies for netty connections. >>> >>>> >>> >>>> >>> >>>> On Fri, Jul 2, 2021 at 9:34 PM Till Rohrmann <trohrm...@apache.org> >>> >>>> wrote: >>> >>>> >>> >>>>> Could you share the full logs with us for the second experiment, >>> Lu? I >>> >>>>> cannot tell from the top of my head why it should take 30s unless >>> you have >>> >>>>> configured a restart delay of 30s. >>> >>>>> >>> >>>>> Let's discuss FLINK-23216 on the JIRA ticket, Gen. >>> >>>>> >>> >>>>> I've now implemented FLINK-23209 [1] but it somehow has the problem >>> >>>>> that in a flakey environment you might not want to mark a >>> TaskExecutor dead >>> >>>>> on the first connection loss. Maybe this is something we need to >>> make >>> >>>>> configurable (e.g. introducing a threshold which admittedly is >>> similar to >>> >>>>> the heartbeat timeout) so that the user can configure it for her >>> >>>>> environment. On the upside, if you mark the TaskExecutor dead on >>> the first >>> >>>>> connection loss (assuming you have a stable network environment), >>> then it >>> >>>>> can now detect lost TaskExecutors as fast as the heartbeat >>> interval. >>> >>>>> >>> >>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209 >>> >>>>> >>> >>>>> Cheers, >>> >>>>> Till >>> >>>>> >>> >>>>> On Fri, Jul 2, 2021 at 9:33 AM Gen Luo <luogen...@gmail.com> >>> wrote: >>> >>>>> >>> >>>>>> Thanks for sharing, Till and Yang. >>> >>>>>> >>> >>>>>> @Lu >>> >>>>>> Sorry but I don't know how to explain the new test with the log. >>> >>>>>> Let's wait for others' reply. >>> >>>>>> >>> >>>>>> @Till >>> >>>>>> It would be nice if JIRAs could be fixed. Thanks again for >>> proposing >>> >>>>>> them. >>> >>>>>> >>> >>>>>> In addition, I was tracking an issue that RM keeps allocating and >>> >>>>>> freeing slots after a TM lost until its heartbeat timeout, when I >>> found the >>> >>>>>> recovery costing as long as heartbeat timeout. That should be a >>> minor bug >>> >>>>>> introduced by declarative resource management. I have created a >>> JIRA about >>> >>>>>> the problem [1] and we can discuss it there if necessary. >>> >>>>>> >>> >>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23216 >>> >>>>>> >>> >>>>>> Lu Niu <qqib...@gmail.com> 于2021年7月2日周五 上午3:13写道: >>> >>>>>> >>> >>>>>>> Another side question, Shall we add metric to cover the complete >>> >>>>>>> restarting time (phase 1 + phase 2)? Current metric >>> jm.restartingTime only >>> >>>>>>> covers phase 1. Thanks! >>> >>>>>>> >>> >>>>>>> Best >>> >>>>>>> Lu >>> >>>>>>> >>> >>>>>>> On Thu, Jul 1, 2021 at 12:09 PM Lu Niu <qqib...@gmail.com> >>> wrote: >>> >>>>>>> >>> >>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick fix! >>> >>>>>>>> >>> >>>>>>>> I did another test yesterday. In this test, I intentionally >>> throw >>> >>>>>>>> exception from the source operator: >>> >>>>>>>> ``` >>> >>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1 >>> >>>>>>>> && errorFrenquecyInMin > 0 >>> >>>>>>>> && System.currentTimeMillis() - lastStartTime >= >>> >>>>>>>> errorFrenquecyInMin * 60 * 1000) { >>> >>>>>>>> lastStartTime = System.currentTimeMillis(); >>> >>>>>>>> throw new RuntimeException( >>> >>>>>>>> "Trigger expected exception at: " + lastStartTime); >>> >>>>>>>> } >>> >>>>>>>> ``` >>> >>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2 >>> >>>>>>>> dropped to 1s (because no need for container allocation). Why >>> phase 1 still >>> >>>>>>>> takes 30s even though no TM is lost? >>> >>>>>>>> >>> >>>>>>>> Related logs: >>> >>>>>>>> ``` >>> >>>>>>>> 2021-06-30 00:55:07,463 INFO >>> >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph >>> - Source: >>> >>>>>>>> >>> USER_MATERIALIZED_EVENT_SIGNAL-user_context-frontend_event_source -> ... >>> >>>>>>>> java.lang.RuntimeException: Trigger expected exception at: >>> 1625014507446 >>> >>>>>>>> 2021-06-30 00:55:07,509 INFO >>> >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph >>> - Job >>> >>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >>> >>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state RUNNING >>> to >>> >>>>>>>> RESTARTING. >>> >>>>>>>> 2021-06-30 00:55:37,596 INFO >>> >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph >>> - Job >>> >>>>>>>> NRTG_USER_MATERIALIZED_EVENT_SIGNAL_user_context_staging >>> >>>>>>>> (35c95ee7141334845cfd49b642fa9f98) switched from state >>> RESTARTING to >>> >>>>>>>> RUNNING. >>> >>>>>>>> 2021-06-30 00:55:38,678 INFO >>> >>>>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph >>> (time when >>> >>>>>>>> all tasks switch from CREATED to RUNNING) >>> >>>>>>>> ``` >>> >>>>>>>> Best >>> >>>>>>>> Lu >>> >>>>>>>> >>> >>>>>>>> >>> >>>>>>>> On Thu, Jul 1, 2021 at 12:06 PM Lu Niu <qqib...@gmail.com> >>> wrote: >>> >>>>>>>> >>> >>>>>>>>> Thanks TIll and Yang for help! Also Thanks Till for a quick >>> fix! >>> >>>>>>>>> >>> >>>>>>>>> I did another test yesterday. In this test, I intentionally >>> throw >>> >>>>>>>>> exception from the source operator: >>> >>>>>>>>> ``` >>> >>>>>>>>> if (runtimeContext.getIndexOfThisSubtask() == 1 >>> >>>>>>>>> && errorFrenquecyInMin > 0 >>> >>>>>>>>> && System.currentTimeMillis() - lastStartTime >= >>> >>>>>>>>> errorFrenquecyInMin * 60 * 1000) { >>> >>>>>>>>> lastStartTime = System.currentTimeMillis(); >>> >>>>>>>>> throw new RuntimeException( >>> >>>>>>>>> "Trigger expected exception at: " + lastStartTime); >>> >>>>>>>>> } >>> >>>>>>>>> ``` >>> >>>>>>>>> In this case, I found phase 1 still takes about 30s and Phase 2 >>> >>>>>>>>> dropped to 1s (because no need for container allocation). >>> >>>>>>>>> >>> >>>>>>>>> Some logs: >>> >>>>>>>>> ``` >>> >>>>>>>>> ``` >>> >>>>>>>>> >>> >>>>>>>>> >>> >>>>>>>>> On Thu, Jul 1, 2021 at 6:28 AM Till Rohrmann < >>> trohrm...@apache.org> >>> >>>>>>>>> wrote: >>> >>>>>>>>> >>> >>>>>>>>>> A quick addition, I think with FLINK-23202 it should now also >>> be >>> >>>>>>>>>> possible to improve the heartbeat mechanism in the general >>> case. We can >>> >>>>>>>>>> leverage the unreachability exception thrown if a remote >>> target is no >>> >>>>>>>>>> longer reachable to mark an heartbeat target as no longer >>> reachable [1]. >>> >>>>>>>>>> This can then be considered as if the heartbeat timeout has >>> been triggered. >>> >>>>>>>>>> That way we should detect lost TaskExecutors as fast as our >>> heartbeat >>> >>>>>>>>>> interval is. >>> >>>>>>>>>> >>> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23209 >>> >>>>>>>>>> >>> >>>>>>>>>> Cheers, >>> >>>>>>>>>> Till >>> >>>>>>>>>> >>> >>>>>>>>>> On Thu, Jul 1, 2021 at 1:46 PM Yang Wang < >>> danrtsey...@gmail.com> >>> >>>>>>>>>> wrote: >>> >>>>>>>>>> >>> >>>>>>>>>>> Since you are deploying Flink workloads on Yarn, the Flink >>> >>>>>>>>>>> ResourceManager should get the container >>> >>>>>>>>>>> completion event after the heartbeat of Yarn NM->Yarn >>> RM->Flink >>> >>>>>>>>>>> RM, which is 8 seconds by default. >>> >>>>>>>>>>> And Flink ResourceManager will release the dead TaskManager >>> >>>>>>>>>>> container once received the completion event. >>> >>>>>>>>>>> As a result, Flink will not deploy tasks onto the dead >>> >>>>>>>>>>> TaskManagers. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> I think most of the time cost in Phase 1 might be cancelling >>> the >>> >>>>>>>>>>> tasks on the dead TaskManagers. >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> Best, >>> >>>>>>>>>>> Yang >>> >>>>>>>>>>> >>> >>>>>>>>>>> >>> >>>>>>>>>>> Till Rohrmann <trohrm...@apache.org> 于2021年7月1日周四 下午4:49写道: >>> >>>>>>>>>>> >>> >>>>>>>>>>>> The analysis of Gen is correct. Flink currently uses its >>> >>>>>>>>>>>> heartbeat as the primary means to detect dead TaskManagers. >>> This means that >>> >>>>>>>>>>>> Flink will take at least `heartbeat.timeout` time before >>> the system >>> >>>>>>>>>>>> recovers. Even if the cancellation happens fast (e.g. by >>> having configured >>> >>>>>>>>>>>> a low akka.ask.timeout), then Flink will still try to >>> deploy tasks onto the >>> >>>>>>>>>>>> dead TaskManager until it is marked as dead and its slots >>> are released >>> >>>>>>>>>>>> (unless the ResourceManager does not get a signal from the >>> underlying >>> >>>>>>>>>>>> resource management system that a container/pod has died). >>> One way to >>> >>>>>>>>>>>> improve the situation is to introduce logic which can react >>> to a >>> >>>>>>>>>>>> ConnectionException and then black lists or releases a >>> TaskManager, for >>> >>>>>>>>>>>> example. This is currently not implemented in Flink, though. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Concerning the cancellation operation: Flink currently does >>> not >>> >>>>>>>>>>>> listen to the dead letters of Akka. This means that the >>> `akka.ask.timeout` >>> >>>>>>>>>>>> is the primary means to fail the future result of a rpc >>> which could not be >>> >>>>>>>>>>>> sent. This is also an improvement we should add to Flink's >>> RpcService. I've >>> >>>>>>>>>>>> created a JIRA issue for this problem [1]. >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-23202 >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> Cheers, >>> >>>>>>>>>>>> Till >>> >>>>>>>>>>>> >>> >>>>>>>>>>>> On Wed, Jun 30, 2021 at 6:33 PM Lu Niu <qqib...@gmail.com> >>> >>>>>>>>>>>> wrote: >>> >>>>>>>>>>>> >>> >>>>>>>>>>>>> Thanks Gen! cc flink-dev to collect more inputs. >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> Best >>> >>>>>>>>>>>>> Lu >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>> On Wed, Jun 30, 2021 at 12:55 AM Gen Luo < >>> luogen...@gmail.com> >>> >>>>>>>>>>>>> wrote: >>> >>>>>>>>>>>>> >>> >>>>>>>>>>>>>> I'm also wondering here. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> In my opinion, it's because the JM can not confirm whether >>> >>>>>>>>>>>>>> the TM is lost or it's a temporary network trouble and >>> will recover soon, >>> >>>>>>>>>>>>>> since I can see in the log that akka has got a Connection >>> refused but JM >>> >>>>>>>>>>>>>> still sends a heartbeat request to the lost TM until it >>> reaches heartbeat >>> >>>>>>>>>>>>>> timeout. But I'm not sure if it's indeed designed like >>> this. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>>> I would really appreciate it if anyone who knows more >>> details >>> >>>>>>>>>>>>>> could answer. Thanks. >>> >>>>>>>>>>>>>> >>> >>>>>>>>>>>>> >>> >>