Hi

其实问题本质很简单,作为一个分布式计算引擎,需要一个分布式文件系统才能保证可以做到分布式检查点的创建和恢复 
[1]。没有必要再尝试将checkpoint写到本地磁盘上,否则一旦发生failover是无法正常恢复的。


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#prerequisites

祝好
唐云
________________________________
From: Carmen Free <ddjxuxi...@gmail.com>
Sent: Wednesday, January 13, 2021 11:28
To: user-zh@flink.apache.org <user-zh@flink.apache.org>
Subject: Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

Hi 唐老师,

我又重新尝试了新的场景。

我将集群起来后,然后使A节点的TM失效( 此时集群中仅有A节点的JM, B节点的TM ),这个时候在flnk web
ui界面提交新的任务,任务被调度到B节点TM,可以发现任务一直正常运行,但是trigger检查点快照时一直不成功。报错跟前文中描述恢复检查点时的错误很类似,只不过恢复检查点时,是找不到chk-xx文件,在这里是无法创建chk-xx文件。

具体报错如下:
2021-01-13 10:33:10,042 WARN
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Received
late message for now expired checkpoint attempt 901 from task
709c3296b490f2e9cb3179ac4d9bbadb of job 14cdc07090d9f22445ba61144f6edf4b at
a0c28dc25b207ca789592018235fda56 @ node204 (dataPort=34389).
2021-01-13 10:33:12,836 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Triggering
checkpoint 902 @ 1610505192836 for job 14cdc07090d9f22445ba61144f6edf4b.
2021-01-13 10:33:12,844 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Decline
checkpoint 902 by task a7f9e9a63f1857b02a9e52c9d6bfefc7 of job
14cdc07090d9f22445ba61144f6edf4b at a0c28dc25b207ca789592018235fda56 @
node204 (dataPort=34389).
2021-01-13 10:33:12,844 INFO
 org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Discarding
checkpoint 902 of job 14cdc07090d9f22445ba61144f6edf4b.

java.lang.Exception: Could not materialize checkpoint 902 for operator
Source: Custom Source -> Flat Map -> Timestamps/Watermarks (1/2).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:1221)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1163)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

Caused by: java.util.concurrent.ExecutionException: java.io.IOException:
Could not flush and close the file system output stream to null in order to
obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:461)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:1126)
... 3 more

Caused by: java.io.IOException: Could not flush and close the file system
output stream to null in order to obtain the stream state handle
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:334)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:179)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108)
at
org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:458)
... 5 more

Caused by: java.io.IOException: Could not open output stream for state
backend
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:367)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:234)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:309)
... 10 more

Caused by: java.io.IOException: Mkdirs failed to create
file:/data/flink/checkpoints/14cdc07090d9f22445ba61144f6edf4b/chk-902
at
org.apache.flink.core.fs.local.LocalFileSystem.create(LocalFileSystem.java:269)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:126)
at
org.apache.flink.core.fs.EntropyInjector.createEntropyAware(EntropyInjector.java:61)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:356)
... 12 more

那么也就是说,如果JM节点不同时作为TM节点时,checkpoint根本无法触发,更无从谈及failover
restore了。因此我们可以认为flink仅支持statebackend的checkpoint目录是DFS时,才可以对状态恢复有比较好的支持是吗?否则,就意味着,JM节点必须同时作为TM,并且每次任务要调度至该节点上运行,才可以触发checkpoint,最终也只能在该节点进行failover
restore。感觉这样的话,就确实不太适合配置本地目录作为state.checkpoints.dir 了。

另外,您在回复中也提到,在使用本地目录作为 state.checkpoints.dir 时,
“必须得让原task部署回原来的机器才能正常运行”,实际上意味着任务要被调度至JM所在节点才可以。然后,任务调度时,flink这边能人为指定任务的分配吗?我看前边的讨论,好像flink的任务调度基本是自动完成的,人为干预难度较大是吧?

祝好



Yun Tang <myas...@live.com> 于2021年1月12日周二 下午7:11写道:

> Hi,
>
> 从异常日志看,应该是因为你的state.checkpoints.dir 或者说
> statebackend的checkpoint目录配置成了本地目录,checkpoint保存到了本地机器上,所以在failover
> restore的时候,必须得让原task部署回原来的机器才能正常运行。将state backend的checkpoint目录更换为一个DFS目录即可。
>
>
> 祝好
> 唐云
> ________________________________
> From: Carmen Free <ddjxuxi...@gmail.com>
> Sent: Tuesday, January 12, 2021 18:14
> To: user-zh@flink.apache.org <user-zh@flink.apache.org>
> Subject: Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> 你好,唐老师,谢谢解答。
>
> 不好意思,下面补充一下报错信息,刚才忘记说了。
>
> 主要报错信息如下,重新模拟了下:
> 2021-01-12 18:09:34,236 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> Custom Source -> Flat Map -> Timestamps/Watermarks (2/2)
> (3b50a7ce56b408c2978260846b76a28a) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4c107a3a.
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator
> state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(2/2) from
> any of the 1 provided restore options.
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252)
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
> ... 9 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore operator state backend
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565)
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243)
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
>
> Caused by: java.io.FileNotFoundException:
>
> /data/flink/checkpoints/b261de0447d59bb2f1db9c084f5b1a0b/chk-5/4a160901-8ddd-468e-a82d-6efcb8a9dff9
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.<init>(FileInputStream.java:138)
> at
>
> org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50)
> at
>
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
> at
>
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
> at
>
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
> at
>
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
> at
>
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:73)
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
> ... 15 more
>
>
> 这个文件夹在A节点(JM)上是有的,难道是访问权限问题吗?B节点无法访问A节点吗,有点奇怪啊,配置了ssh免密的啊,文件夹/data/flink/checkpoints访问权限也设置成了777
>
> Yun Tang <myas...@live.com> 于2021年1月12日周二 下午5:46写道:
>
> > Hi
> >
> > Flink的容错机制是可以保证TM lost时候会尝试重启作业,“为何任务不能恢复”是需要看完整异常栈的,简单描述是无法帮助排查问题的。
> >
> > 祝好
> > 唐云
> > ________________________________
> > From: Carmen Free <ddjxuxi...@gmail.com>
> > Sent: Tuesday, January 12, 2021 15:52
> > To: user-zh@flink.apache.org <user-zh@flink.apache.org>
> > Subject: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
> >
> > hi,
> >
> > rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
> >
> > 1、环境说明
> >
> > flink版本:1.10.2
> > 操作系统:centos 7
> >
> > 2、集群说明(当前模拟了2节点)
> >
> >                     节点A           |      节点B
> > 角色        |   JM、TM        |        TM
> > taskslot   |       4               |         4
> >
> > 3、statebackend配置
> >
> > # rocksdb作为状态后备
> > state.backend: rocksdb
> >
> >
> > # 存储快照的目录(暂时使用的本地目录)
> > state.checkpoints.dir: file:///data/flink/checkpoints
> >
> >
> >
> 4、启动任务后,任务自动分配在A节点的TM上,运行一段时间后,检查点快照正常。接着,仅停掉A节点TM(JM仍正常运行),任务被自动调度至B节点的TM上,但是此时任务一直重启,无法恢复,这是为什么呢?
> >
> >
> >
> 5、如果我启动A节点,任务依旧无法恢复(此时任务仍在B节点运行),直到我停掉B节点TM,此时任务调度至A节点,任务可以正常恢复。所以有点疑问,4中的场景为何不能恢复任务呢?为什么只有在A节点上才可以进行任务恢复呢?最初以为是访问路径的问题,但是仔细想了想,检查点相关的操作一直都是JM进行的,我觉得只要JM没有挂掉,应该就可以将任务进行恢复啊,是我的理解有偏差吗?
> >
>

回复