是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?

On Mon, 14 Sep 2020 at 20:15, Harold.Miao <miaohong...@gmail.com> wrote:

> 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
>
> private StreamExecutionEnvironment createStreamExecutionEnvironment() {
>    final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>
>
>
>
>
>
> *   LOG.info("restore cp exist: {}",
> environment.getExecution().getRestoreSp().isPresent());   if
> (environment.getExecution().getRestoreSp().isPresent()) {
> LOG.info("restore cp path: {}",
> environment.getExecution().getRestoreSp().get());      if
> (!environment.getExecution().getRestoreSp().get().contains("none")) {
>        SavepointRestoreSettings savepointRestoreSettings =
>
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> true);
>  env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
>      }   }*
>    // for TimeCharacteristic validation in StreamTableEnvironmentImpl
>
>  
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
>    if (env.getStreamTimeCharacteristic() == TimeCharacteristic.EventTime) {
>
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
>    }
>    return env;
> }
>
>
> 传入上面那个只有meta文件地址的时候报错如下:
>
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected
> exception. This is a bug. Please consider filing an issue.
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
>         at
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
>         at
> org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
>         at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> Caused by: java.lang.IllegalStateException: No operators defined in
> streaming topology. Cannot execute.
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
>         at
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
>         at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
>         ... 3 more
>
>
> 错误很明显的显示没有算子的state
>
>
>
>
>
>
>
>
> Congxian Qiu <qcx978132...@gmail.com> 于2020年9月14日周一 下午7:53写道:
>
> > Hi
> >    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > 这一个文件的。具体逻辑可以看一下这里[1]
> >
> > [1]
> >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > Best,
> > Congxian
> >
> >
> > Harold.Miao <miaohong...@gmail.com> 于2020年9月14日周一 下午6:44写道:
> >
> > > hi  all
> > >
> > > flink 版本: 1.11.1
> > >
> > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > >
> > > state.backend: filesystem
> > > state.backend.fs.checkpointdir:
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > state.checkpoints.dir:
> > >
> hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > state.savepoints.dir:
> > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > >
> > > execution.checkpointing.externalized-checkpoint-retention:
> > > RETAIN_ON_CANCELLATION
> > > execution.checkpointing.interval: 60s
> > > execution.checkpointing.mode: EXACTLY_ONCE
> > > jobmanager.execution.failover-strategy: full
> > > state.backend.incremental: true
> > >
> > >
> > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> > >
> > > 类似下面:
> > >
> > > hdfs://
> > >
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> > >
> > > 除了这个文件,其他什么都没有。
> > >
> > > 我们的源是kafka,kafka肯定会保存state的。
> > >
> > >
> > > 请教大家这是什么原因导致的呢
> > >
> > >
> > > 谢谢
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > > --
> > >
> > > Best Regards,
> > > Harold Miao
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>

回复