hi all. 我使用flink 1.5.0 在触发 savepoint失败。
共享目录:/data/emr_flink_savepoint_share/
触发命令:bin/flink savepoint feaab3ec9031bce4eab0b677693ab9f0
file:///data/emr_flink_savepoint_share <file:///data/emr_flink_savepoint_share>
Hadoop conf 默认文件系统是 hdfs://flink-hdfs
报错:
Caused by: java.lang.Exception: Could not materialize checkpoint 9381 for
operator Source: KafkaJSONStringTableSource -> Map -> where: (OR(=(e,
_UTF-16LE'INSERT'), =(e, _UTF-16LE'DELETE'), =(e, _UTF-16LE'UPDATE'))), select:
(CAST(get_json_object(data, _UTF-16LE'pid')) AS EXPR$0,
CAST(get_json_object(data, _UTF-16LE'tag_id')) AS EXPR$1,
CAST(get_json_object(data, _UTF-16LE'tag_type')) AS EXPR$2,
get_json_object(data, _UTF-16LE'tag_name') AS EXPR$3,
CAST(get_json_object(data, _UTF-16LE'tag_version')) AS EXPR$4,
CAST(get_json_object(data, _UTF-16LE'att_type')) AS EXPR$5,
CAST(get_json_object(data, _UTF-16LE'is_del')) AS EXPR$6, e) -> to: Tuple2 ->
Sink: Unnamed (1/1).
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could
not open output stream for state backend
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
at
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
at
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
... 5 more
Caused by: java.io.IOException: Could not open output stream for state backend
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:360)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
at java.io.DataOutputStream.write(DataOutputStream.java:107)
at
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:161)
at
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoWriterV2.writeOperatorStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:142)
at
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:77)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:411)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
at
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
... 7 more
Caused by: java.lang.IllegalArgumentException: Wrong FS:
file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1,
expected: hdfs://flink-hdfs
at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
at
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193)
at
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
at
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
at
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
at
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
at
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:140)
at
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:36)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
at
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
... 18 more
在本地测试的时候。1.5.0 是可以触发 savepoint 的。online 环境配置的 checkpoint 目录是。hdfs:XXX.
还有一个1.5.6 的online 集群是可以触发 savepoint 的。两个集群 flink-conf.yaml 配置一致。
有没有大佬知道是什么情况,或者提供一下排查思路。十分感谢。