Hi,

我觉得增加到3分钟可能不是一个合适的方法,这会增加作业恢复时间。建议还是追查一下为什么上游task这么长时间没有部署启动成功比较好。

Best,
Shammon FY


On Fri, Jul 14, 2023 at 2:25 PM zhan...@eastcom-sw.com <
zhan...@eastcom-sw.com> wrote:

> hi, 上次将`taskmanager.network.request-backoff.max` 从默认的10s增加到30s后 跑了5天还是出现
> PartitionNotFoundException循环重启
> 从日志看是连续三次checkpoint超时失败后自动重启job (Checkpointed Data
> Size一直在增长,即便当前无数据处理,也有几十上百M),某个算子会一直失败重启任务
>
> 以下是整个过程的失败日志,是否将 `taskmanager.network.request-backoff.max` 再增加到3分钟可以避免
> PartitionNotFoundException ?
>
> 2023-07-12 11:07:49,490 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] -
> Checkpoint 19177 of job 3b800d54fb6a002be7feadb1a8b6894e expired before
> completing.
> 2023-07-12 11:07:49,490 WARN
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager [] - Failed to
> trigger or complete checkpoint 19177 for job
> 3b800d54fb6a002be7feadb1a8b6894e. (3 consecutive failed attempts so far)
> org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint
> expired before completing.
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2216)
> [flink-dist-1.17.1.jar:1.17.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> [?:1.8.0_77]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> [?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> [?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> [?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> [?:1.8.0_77]
> at java.lang.Thread.run(Thread.java:745) [?:1.8.0_77]
> 2023-07-12 11:07:49,490 INFO
> org.apache.flink.runtime.checkpoint.CheckpointRequestDecider [] -
> checkpoint request time in queue: 2280006
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - Trying to
> recover from a global failure.
> org.apache.flink.util.FlinkRuntimeException: Exceeded checkpoint tolerable
> failure threshold. The latest checkpoint failed due to Checkpoint expired
> before completing., view the Checkpoint History tab or the Job Manager log
> to find out why continuous checkpoints failed.
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.checkFailureAgainstCounter(CheckpointFailureManager.java:212)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleJobLevelCheckpointException(CheckpointFailureManager.java:169)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointFailureManager.handleCheckpointException(CheckpointFailureManager.java:122)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2155)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.abortPendingCheckpoint(CheckpointCoordinator.java:2134)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.access$700(CheckpointCoordinator.java:101)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2216)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_77]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> ~[?:1.8.0_77]
> at java.lang.Thread.run(Thread.java:745) ~[?:1.8.0_77]
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - 51 tasks
> will be restarted to recover from a global failure.
> 2023-07-12 11:07:49,492 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Phase
> 2 Data Warehouse Processing (3b800d54fb6a002be7feadb1a8b6894e) switched
> from state RUNNING to RESTARTING.
>
> 2023-07-12 11:07:50,007 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Group
> Windowing (5s) (16/16)
> (ce053119a37c9c5ca107f7386cd9fd8f_d952d2a6aebfb900c453884c57f96b82_15_0)
> switched from CANCELING to CANCELED.
> 2023-07-12 11:07:50,007 INFO
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager
> [] - Clearing resource requirements of job 3b800d54fb6a002be7feadb1a8b6894e
>
> 2023-07-12 11:07:50,496 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Phase
> 2 Data Warehouse Processing (3b800d54fb6a002be7feadb1a8b6894e) switched
> from state RESTARTING to RUNNING.
> 2023-07-12 11:07:50,497 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring
> job 3b800d54fb6a002be7feadb1a8b6894e from Checkpoint 19173 @ 1689128839484
> for 3b800d54fb6a002be7feadb1a8b6894e located at hdfs://
> 10.252.210.64:9000/flink-checkpoints/3b800d54fb6a002be7feadb1a8b6894e/chk-19173.
> 2023-07-12
> <http://10.252.210.64:9000/flink-checkpoints/3b800d54fb6a002be7feadb1a8b6894e/chk-19173.2023-07-12>
> 11:07:50,499 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - No master
> state to restore
> 2023-07-12 11:07:50,499 INFO
> org.apache.flink.runtime.operators.coordination.RecreateOnResetOperatorCoordinator
> [] - Resetting coordinator to checkpoint.
> 2023-07-12 11:07:50,502 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink:
> Doris Sink 2 (4/18)
> (ce053119a37c9c5ca107f7386cd9fd8f_8fb6014c2df1d028b4c9ec6b86c8738f_3_1)
> switched from CREATED to SCHEDULED.
>
> 2023-07-12 11:07:50,511 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink:
> Doris Sink 2 (4/18)
> (ce053119a37c9c5ca107f7386cd9fd8f_8fb6014c2df1d028b4c9ec6b86c8738f_3_1)
> switched from SCHEDULED to DEPLOYING.
>
> 2023-07-12 11:07:50,590 ERROR akka.remote.EndpointWriter
>                  [] - Transient association error (association remains live)
> akka.remote.OversizedPayloadException: Discarding oversized payload sent
> to Actor[akka.tcp://
> flink@10.252.210.62:42583/user/rpc/taskmanager_0#-1916466409]: max
> allowed size 10485760 bytes, actual size of encoded class
> org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation was 10515960
> bytes.
>
> 2023-07-12 11:07:50,602 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink:
> Doris Sink 2 (4/18)
> (ce053119a37c9c5ca107f7386cd9fd8f_8fb6014c2df1d028b4c9ec6b86c8738f_3_1)
> switched from DEPLOYING to INITIALIZING.
> 2023-07-12 11:07:50,607 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink:
> Doris Sink 2 (4/18)
> (ce053119a37c9c5ca107f7386cd9fd8f_8fb6014c2df1d028b4c9ec6b86c8738f_3_1)
> switched from INITIALIZING to RUNNING.
>
> 2023-07-12 11:09:11,738 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink:
> Doris Sink 2 (4/18)
> (ce053119a37c9c5ca107f7386cd9fd8f_8fb6014c2df1d028b4c9ec6b86c8738f_3_1)
> switched from RUNNING to FAILED on 10.252.210.64:21987-01f5cf @
> nbiot-core-mpp-dcos-b-4.novalocal (dataPort=31441).
> org.apache.flink.runtime.io.network.partition.PartitionNotFoundException:
> Partition
> d883715596bbde80137b5de405fb3ae1#15@ce053119a37c9c5ca107f7386cd9fd8f_d952d2a6aebfb900c453884c57f96b82_15_1
> not found.
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:314)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:206)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:629)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at 
> org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:988)
> ~[flink-dist-1.17.1.jar:1.17.1]
> at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture.java:656)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:632)
> ~[?:1.8.0_77]
> at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
> ~[?:1.8.0_77]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
> ~[?:?]
> at
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$withContextClassLoader$0(ClassLoadingUtils.java:41)
> ~[?:?]
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49) ~[?:?]
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
> ~[?:?]
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> [?:1.8.0_77]
> at
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> [?:1.8.0_77]
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> [?:1.8.0_77]
> at
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> [?:1.8.0_77]
> 2023-07-12 11:09:11,739 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                 [] - 51 tasks
> will be restarted to recover the failed task
> ce053119a37c9c5ca107f7386cd9fd8f_8fb6014c2df1d028b4c9ec6b86c8738f_3_1.
> 2023-07-12 11:09:11,739 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Phase
> 2 Data Warehouse Processing (3b800d54fb6a002be7feadb1a8b6894e) switched
> from state RUNNING to RESTARTING.
>
>
> 发件人: Shammon FY
> 发送时间: 2023-07-05 17:43
> 收件人: user-zh
> 主题: Re: Re: PartitionNotFoundException循环重启
> Hi,
>
> 如果要增加request
>
> partition的重试时间,可以调整配置项`taskmanager.network.request-backoff.max`,默认是10秒,具体配置可以参阅[1]
>
> [1]
>
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#full-taskmanageroptions
>
> Best,
> Shammon FY
>
> On Tue, Jul 4, 2023 at 11:38 AM zhan...@eastcom-sw.com <
> zhan...@eastcom-sw.com> wrote:
>
> > 从前面日志看是重启后从hdfs加载checkpoint数据处理(100M左右)这过程好像有点久,还有连kafka消费
> > 下游的超时重试  可以设置次数或者时长吗?
> >
> > 发件人: Shammon FY
> > 发送时间: 2023-07-04 10:12
> > 收件人: user-zh
> > 主题: Re: PartitionNotFoundException循环重启
> > Hi,
> >
> > PartitionNotFoundException异常原因通常是下游task向上游task发送partition
> >
> >
> request请求,但是上游task还没有部署成功。一般情况下,下游task会重试,超时后会报出异常。你可以查看下有没有其他的异常日志,查一下上游task为什么没有部署成功。
> >
> > Best,
> > Shammon FY
> >
> > On Tue, Jul 4, 2023 at 9:30 AM zhan...@eastcom-sw.com <
> > zhan...@eastcom-sw.com> wrote:
> >
> > >
> > > 异常日志内容
> > >
> > > 2023-07-03 20:30:15,164 INFO
> > > org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink:
> > > Sink 3 (2/45)
> > > (79a20a2489a31465de9524eaf6b5ebf7_8fb6014c2df1d028b4c9ec6b86c8738f_
> > > 1_3093) switched from RUNNING to FAILED on 10.252.210.63:2359-420157 @
> > > nbiot-core-mpp-dcos-b-2.novalocal (dataPort=32769).
> > > org.apache.flink.runtime.io
> > .network.partition.PartitionNotFoundException:
> > > Partition
> > >
> >
> 65e701af2579c0381a2c3e53bd66fed0#24@79a20a2489a31465de9524eaf6b5ebf7_d952d2a6aebfb900c453884c57f96b82_24_
> > > 3093 not found.
> > >         at org.apache.flink.runtime.io
> >
> .network.partition.ResultPartitionManager.createSubpartitionView(ResultPartitionManager.java:70)
> > > ~[flink-dist-1.17.1.jar:1.17.1]
> > >         at org.apache.flink.runtime.io
> >
> .network.partition.consumer.LocalInputChannel.requestSubpartition(LocalInputChannel.java:136)
> > > ~[flink-dist-1.17.1.jar:1.17.1]
> > >         at org.apache.flink.runtime.io
> >
> .network.partition.consumer.LocalInputChannel$1.run(LocalInputChannel.java:186)
> > > ~[flink-dist-1.17.1.jar:1.17.1]
> > >         at java.util.TimerThread.mainLoop(Timer.java:555) ~[?:1.8.0_77]
> > >         at java.util.TimerThread.run(Timer.java:505) ~[?:1.8.0_77]
> > >
> > >
> > >
> > > 发件人: zhan...@eastcom-sw.com
> > > 发送时间: 2023-07-04 09:25
> > > 收件人: user-zh
> > > 主题: PartitionNotFoundException循环重启
> > >     hi,我这有两个流量比较大的job(一天3亿/6亿),在启动正常运行了5、6天左右就会出现
> > > PartitionNotFoundException 的异常,然后不断的循环重启
> > >
> > >     在flink-conf.yaml中添加以下参数后,也是同样在6天后会 循环报 PartitionNotFoundException
> > > 的异常后,不断的重启....
> > >     taskmanager.network.tcp-connection.enable-reuse-across-jobs: false
> > >     taskmanager.network.max-num-tcp-connections: 16
> > >
> > >     当前版本 1.17.1,同样的job跟数据在1.14.4中一直没问题,请问这个有什么办法解决么?
> > >
> > >
> >
>

Reply via email to