还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下

private StreamExecutionEnvironment createStreamExecutionEnvironment() {
   final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();







*   LOG.info("restore cp exist: {}",
environment.getExecution().getRestoreSp().isPresent());   if
(environment.getExecution().getRestoreSp().isPresent()) {
LOG.info("restore cp path: {}",
environment.getExecution().getRestoreSp().get());      if
(!environment.getExecution().getRestoreSp().get().contains("none")) {
       SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
true);         
env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
     }   }*
   // for TimeCharacteristic validation in StreamTableEnvironmentImpl
   
env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
   if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
      
env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
   }
   return env;
}


传入上面那个只有meta文件地址的时候报错如下:

Exception in thread "main"
org.apache.flink.table.client.SqlClientException: Unexpected
exception. This is a bug. Please consider filing an issue.
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
Could not create execution context.
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
Caused by: java.lang.IllegalStateException: No operators defined in
streaming topology. Cannot execute.
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
        at 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
        at 
org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
        ... 3 more


错误很明显的显示没有算子的state








Congxian Qiu <qcx978132...@gmail.com> 于2020年9月14日周一 下午7:53写道:

> Hi
>    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> 这一个文件的。具体逻辑可以看一下这里[1]
>
> [1]
>
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> Best,
> Congxian
>
>
> Harold.Miao <miaohong...@gmail.com> 于2020年9月14日周一 下午6:44写道:
>
> > hi  all
> >
> > flink 版本: 1.11.1
> >
> > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> >
> > state.backend: filesystem
> > state.backend.fs.checkpointdir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > state.checkpoints.dir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > state.savepoints.dir:
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> >
> > execution.checkpointing.externalized-checkpoint-retention:
> > RETAIN_ON_CANCELLATION
> > execution.checkpointing.interval: 60s
> > execution.checkpointing.mode: EXACTLY_ONCE
> > jobmanager.execution.failover-strategy: full
> > state.backend.incremental: true
> >
> >
> > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> >
> > 类似下面:
> >
> > hdfs://
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> >
> > 除了这个文件,其他什么都没有。
> >
> > 我们的源是kafka,kafka肯定会保存state的。
> >
> >
> > 请教大家这是什么原因导致的呢
> >
> >
> > 谢谢
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao

回复