还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
private StreamExecutionEnvironment createStreamExecutionEnvironment() { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); * LOG.info("restore cp exist: {}", environment.getExecution().getRestoreSp().isPresent()); if (environment.getExecution().getRestoreSp().isPresent()) { LOG.info("restore cp path: {}", environment.getExecution().getRestoreSp().get()); if (!environment.getExecution().getRestoreSp().get().contains("none")) { SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(), true); env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings); } }* // for TimeCharacteristic validation in StreamTableEnvironmentImpl env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic()); if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) { env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval()); } return env; } 传入上面那个只有meta文件地址的时候报错如下: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context. at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879) at org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227) at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) Caused by: java.lang.IllegalStateException: No operators defined in streaming topology. Cannot execute. at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834) at org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691) at org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593) at org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184) at org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137) at org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868) ... 3 more 错误很明显的显示没有算子的state Congxian Qiu <qcx978132...@gmail.com> 于2020年9月14日周一 下午7:53写道: > Hi > 如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata > 这一个文件的。具体逻辑可以看一下这里[1] > > [1] > > https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442 > Best, > Congxian > > > Harold.Miao <miaohong...@gmail.com> 于2020年9月14日周一 下午6:44写道: > > > hi all > > > > flink 版本: 1.11.1 > > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下 > > > > state.backend: filesystem > > state.backend.fs.checkpointdir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252 > > state.checkpoints.dir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252 > > state.savepoints.dir: > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252 > > > > execution.checkpointing.externalized-checkpoint-retention: > > RETAIN_ON_CANCELLATION > > execution.checkpointing.interval: 60s > > execution.checkpointing.mode: EXACTLY_ONCE > > jobmanager.execution.failover-strategy: full > > state.backend.incremental: true > > > > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件 > > > > 类似下面: > > > > hdfs:// > > > 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata > > > > 除了这个文件,其他什么都没有。 > > > > 我们的源是kafka,kafka肯定会保存state的。 > > > > > > 请教大家这是什么原因导致的呢 > > > > > > 谢谢 > > > > > > > > > > > > > > > > > > > > > > -- > > > > Best Regards, > > Harold Miao > > > -- Best Regards, Harold Miao