Re: keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

2021-11-25 文章 yidan zhao
当然不会。

杨浩  于2021年11月25日周四 下午6:09写道:

> keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?


keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

2021-11-25 文章 杨浩
keyed state重新指定并行度后,比如2->4或者4->2,请问下是不是会丢失部分老的状态?

Re: keyed state在不同算子中使用

2020-06-10 文章 Congxian Qiu
你好
  现在 KeyedState 是不能跨算子使用的,也就是不同的算子使用的是不同的 state。
Best,
Congxian


Z-Z  于2020年6月11日周四 上午10:11写道:

> 请问,假设两个算子以相同的字段keyby,它们可以使用相同的StateDescriptor从而使用相同的的keyed state吗


keyed state????????????????

2020-06-10 文章 Z-Z
??keybyStateDescriptorkeyed
 state??

Re:Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-13 文章 chenxyz
tory{configuredOptions={}}.

2020-04-14 11:42:45,994 INFO  org.apache.flink.runtime.taskmanager.Task 
- Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) 
switched from DEPLOYING to RUNNING.

2020-04-14 11:42:45,994 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Initializing 
Source: Custom Source (1/1).

2020-04-14 11:42:45,994 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Loading state 
backend via factory 
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

2020-04-14 11:42:45,995 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using 
predefined options: DEFAULT.

2020-04-14 11:42:45,995 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using default 
options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.

2020-04-14 11:42:46,033 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Invoking 
Source: Custom Source (1/1)

2020-04-14 11:42:46,042 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(1/1) 
with empty state.

2020-04-14 11:42:46,057 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask   - Invoking 
KeyedProcess -> Sink: Unnamed (1/1)

2020-04-14 11:42:46,060 DEBUG 
org.apache.flink.runtime.state.TaskStateManagerImpl   - Operator 
c09dc291fad93d575e015871097bfc60 has remote state 
SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, 
operatorStateFromStream=StateObjectCollection{[]}, 
keyedStateFromBackend=StateObjectCollection{[]}, 
keyedStateFromStream=StateObjectCollection{[]}, stateSize=0} from job manager 
and local state alternatives [] from local state store 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.

2020-04-14 11:42:46,060 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(1/1) 
with empty state.

2020-04-14 11:42:46,069 DEBUG 
org.apache.flink.runtime.state.TaskStateManagerImpl   - Operator 
20ba6b65f97481d5570070de90e4e791 has remote state 
SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, 
operatorStateFromStream=StateObjectCollection{[]}, 
keyedStateFromBackend=StateObjectCollection{[IncrementalRemoteKeyedStateHandle{backendIdentifier=04ac09d6-1f1f-4a6c-a78d-74090c83b3c7,
 keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=1, 
sharedState={}, 
privateState={MANIFEST-06=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/2ff261b8-f51c-42bf-9fab-93c6b119dcff',
 dataBytes=206}, OPTIONS-10=File State: 
hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/426c66a6-d32e-43c8-9873-550237ee0963
 [10379 bytes], 
CURRENT=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/bbbce7c9-ea02-4590-9b18-d7a322deb2f4',
 dataBytes=16}}, metaStateHandle=File State: 
hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/chk-1/9215630d-632e-48f6-b668-7dc235a8ff7a
 [1163 bytes], registered=false}]}, 
keyedStateFromStream=StateObjectCollection{[]}, stateSize=11764} from job 
manager and local state alternatives [] from local state store 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.

2020-04-14 11:42:46,070 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
keyed state backend for 
KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) and restoring with 
state from alternative (1/1).

2020-04-14 11:42:46,071 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to 
load RocksDB native library and store it under '/data/flink1_10/tmp'

2020-04-14 11:42:46,071 DEBUG 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to 
create RocksDB native library folder 
/data/flink1_10/tmp/rocksdb-lib-a5f35d4dd06539876a20dbabc82a7f33

2020-04-14 11:42:46,078 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf  - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true

2020-04-14 11:42:46,079 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf  - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true

2020-04-14 11:42:46,080 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory  - 
Loaded default ResourceLeakDetector: 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@28a9bbee

2020-04-14 11:42:46,150 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Successfully 
loaded RocksDB native library

2020-04-14 11:42:46,154 INFO  
org.apache.flink.contrib.streaming.state.RocksDBState

Re: Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-03 文章 Congxian Qiu
HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考
CheckpointMetadataLoadingTest 的相关测试。
我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的
outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开
debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好

Best,
Congxian


chenxyz  于2020年4月1日周三 下午5:18写道:

> Hi, 从贤,
> 我查看了下HDFS,
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。
>
>
>
>
>
>
>
>
> 在 2020-04-01 16:50:13,"Congxian Qiu"  写道:
> >Hi
> >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
> >从 TM 日志看像下载出错了,你可以看下
>
> >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
> >
> >Best,
> >Congxian
> >
> >
> >chenxyz  于2020年4月1日周三 下午3:02写道:
> >
> >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
> >> KeyedProcessOperator。这个问题怎么解决呢?
> >>
> >> 版本:1.10 standalone
> >>
> >> 配置信息:
> >>
> >> state.backend: rocksdb
> >>
> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
> >>
> >> state.backend.incremental: true
> >>
> >> jobmanager.execution.failover-strategy: region
> >>
> >> io.tmp.dirs: /data/flink1_10/tmp
> >>
> >>
> >>
> >>
> >> 任务的checkpoint配置:
> >>
> >> env.enableCheckpointing(2 * 60 * 1000);
> >>
> >>
> >>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
> >>
> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
> >>
> >> env.getCheckpointConfig().setCheckpointTimeout(6);
> >>
> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
> >>
> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
> >>
> >>
> >>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
> >>
> >>
> >>
> >>
> >> 日志信息:
> >>
> >>
> >>
> >>
> >> 2020-04-01 11:13:03
> >>
> >> java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
> >>
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
> >>
> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
> >>
> >> at java.lang.Thread.run(Thread.java:748)
> >>
> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> >> state backend for
> >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of
> the
> >> 1 provided restore options.
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
> >>
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskSta

Re:Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 文章 chenxyz
Hi, 从贤,
我查看了下HDFS, 
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。








在 2020-04-01 16:50:13,"Congxian Qiu"  写道:
>Hi
>Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
>从 TM 日志看像下载出错了,你可以看下
>/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
>
>Best,
>Congxian
>
>
>chenxyz  于2020年4月1日周三 下午3:02写道:
>
>> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
>> KeyedProcessOperator。这个问题怎么解决呢?
>>
>> 版本:1.10 standalone
>>
>> 配置信息:
>>
>> state.backend: rocksdb
>>
>> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>>
>> state.backend.incremental: true
>>
>> jobmanager.execution.failover-strategy: region
>>
>> io.tmp.dirs: /data/flink1_10/tmp
>>
>>
>>
>>
>> 任务的checkpoint配置:
>>
>> env.enableCheckpointing(2 * 60 * 1000);
>>
>>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>>
>> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>>
>> env.getCheckpointConfig().setCheckpointTimeout(6);
>>
>> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>>
>> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>>
>>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>>
>>
>>
>>
>> 日志信息:
>>
>>
>>
>>
>> 2020-04-01 11:13:03
>>
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>>
>> at
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>>
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>>
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>>
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>>
>> at java.lang.Thread.run(Thread.java:748)
>>
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for
>> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the
>> 1 provided restore options.
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>>
>> ... 9 more
>>
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected exception.
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>>
>> at
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>>
>> at
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>>
>> at
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>>
>> ...

Re: rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 文章 Congxian Qiu
Hi
Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
从 TM 日志看像下载出错了,你可以看下
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因

Best,
Congxian


chenxyz  于2020年4月1日周三 下午3:02写道:

> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
> KeyedProcessOperator。这个问题怎么解决呢?
>
> 版本:1.10 standalone
>
> 配置信息:
>
> state.backend: rocksdb
>
> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>
> state.backend.incremental: true
>
> jobmanager.execution.failover-strategy: region
>
> io.tmp.dirs: /data/flink1_10/tmp
>
>
>
>
> 任务的checkpoint配置:
>
> env.enableCheckpointing(2 * 60 * 1000);
>
>
> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>
> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>
> env.getCheckpointConfig().setCheckpointTimeout(6);
>
> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>
> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>
>
> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>
>
>
>
> 日志信息:
>
>
>
>
> 2020-04-01 11:13:03
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for
> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of the
> 1 provided restore options.
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>
> ... 9 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
> unexpected exception.
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>
> at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>
> at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>
> ... 11 more
>
> Caused by: java.nio.file.NoSuchFileException:
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
> ->
> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
>
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>
> at
> sun.nio.fs.UnixFileSystemProvider.createLin

rocksdb作为状态后端任务重启时,恢复作业失败Could not restore keyed state backend for KeyedProcessOperator

2020-04-01 文章 chenxyz
任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for 
KeyedProcessOperator。这个问题怎么解决呢?

版本:1.10 standalone

配置信息:

state.backend: rocksdb

state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint

state.backend.incremental: true

jobmanager.execution.failover-strategy: region

io.tmp.dirs: /data/flink1_10/tmp




任务的checkpoint配置:

env.enableCheckpointing(2 * 60 * 1000);

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);

env.getCheckpointConfig().setCheckpointTimeout(6);

env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);




日志信息:




2020-04-01 11:13:03

java.lang.Exception: Exception while creating StreamOperatorStateContext.

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)

at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from 
any of the 1 provided restore options.

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.

at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)

at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

... 11 more

Caused by: java.nio.file.NoSuchFileException: 
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
 -> 
/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst

at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

at sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)

at java.nio.file.Files.createLink(Files.java:1086)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.bu