hi all. 我使用flink 1.5.0 在触发 savepoint失败。
共享目录:/data/emr_flink_savepoint_share/
触发命令:bin/flink savepoint feaab3ec9031bce4eab0b677693ab9f0 
file:///data/emr_flink_savepoint_share <file:///data/emr_flink_savepoint_share>
Hadoop conf 默认文件系统是  hdfs://flink-hdfs
报错:
Caused by: java.lang.Exception: Could not materialize checkpoint 9381 for 
operator Source: KafkaJSONStringTableSource -> Map -> where: (OR(=(e, 
_UTF-16LE'INSERT'), =(e, _UTF-16LE'DELETE'), =(e, _UTF-16LE'UPDATE'))), select: 
(CAST(get_json_object(data, _UTF-16LE'pid')) AS EXPR$0, 
CAST(get_json_object(data, _UTF-16LE'tag_id')) AS EXPR$1, 
CAST(get_json_object(data, _UTF-16LE'tag_type')) AS EXPR$2, 
get_json_object(data, _UTF-16LE'tag_name') AS EXPR$3, 
CAST(get_json_object(data, _UTF-16LE'tag_version')) AS EXPR$4, 
CAST(get_json_object(data, _UTF-16LE'att_type')) AS EXPR$5, 
CAST(get_json_object(data, _UTF-16LE'is_del')) AS EXPR$6, e) -> to: Tuple2 -> 
Sink: Unnamed (1/1).
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:943)
        ... 6 more
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not open output stream for state backend
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:192)
        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:53)
        at 
org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53)
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:854)
        ... 5 more
Caused by: java.io.IOException: Could not open output stream for state backend
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:360)
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.flush(FsCheckpointStreamFactory.java:225)
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.write(FsCheckpointStreamFactory.java:209)
        at java.io.DataOutputStream.write(DataOutputStream.java:107)
        at 
org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:161)
        at 
org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoWriterV2.writeOperatorStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:142)
        at 
org.apache.flink.runtime.state.OperatorBackendSerializationProxy.write(OperatorBackendSerializationProxy.java:77)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:411)
        at 
org.apache.flink.runtime.state.DefaultOperatorStateBackend$1.performOperation(DefaultOperatorStateBackend.java:352)
        at 
org.apache.flink.runtime.io.async.AbstractAsyncCallableWithResources.call(AbstractAsyncCallableWithResources.java:75)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:50)
        ... 7 more
Caused by: java.lang.IllegalArgumentException: Wrong FS: 
file:///data/emr_flink_savepoint_share/savepoint-0705a3-09a2f171a080/e2f63448-eed9-4038-a64a-e874a1a99ba1,
 expected: hdfs://flink-hdfs
        at org.apache.hadoop.fs.FileSystem.checkPath(FileSystem.java:645)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.getPathName(DistributedFileSystem.java:193)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$000(DistributedFileSystem.java:105)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:397)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem$6.doCall(DistributedFileSystem.java:393)
        at 
org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:393)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:337)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
        at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:140)
        at 
org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.create(HadoopFileSystem.java:36)
        at 
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.create(SafetyNetWrapperFileSystem.java:121)
        at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.createStream(FsCheckpointStreamFactory.java:348)
        ... 18 more
在本地测试的时候。1.5.0 是可以触发 savepoint 的。online 环境配置的 checkpoint 目录是。hdfs:XXX. 
还有一个1.5.6 的online 集群是可以触发 savepoint 的。两个集群 flink-conf.yaml 配置一致。
有没有大佬知道是什么情况,或者提供一下排查思路。十分感谢。




回复