是同一个insert任务, 只是重启任务的时候 ,我加了这些代码,构造一个 SavepointRestoreSettings  来恢复cp

请教   我如何判断cp真正写入了hdfs呢,meta文件有什么工具可以解析吗

谢谢

Jark Wu <imj...@gmail.com> 于2020年9月15日周二 上午11:31写道:

> 是不是你的 cp 恢复的代码,没有执行任何的 insert into 语句?
>
> On Mon, 14 Sep 2020 at 20:15, Harold.Miao <miaohong...@gmail.com> wrote:
>
> > 还有一点是 我们修改了sql-client代码, 让任务从cp恢复,修改如下
> >
> > private StreamExecutionEnvironment createStreamExecutionEnvironment() {
> >    final StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >
> >
> >
> >
> >
> >
> >
> > *   LOG.info("restore cp exist: {}",
> > environment.getExecution().getRestoreSp().isPresent());   if
> > (environment.getExecution().getRestoreSp().isPresent()) {
> > LOG.info("restore cp path: {}",
> > environment.getExecution().getRestoreSp().get());      if
> > (!environment.getExecution().getRestoreSp().get().contains("none")) {
> >        SavepointRestoreSettings savepointRestoreSettings =
> >
> >
> SavepointRestoreSettings.forPath(environment.getExecution().getRestoreSp().get(),
> > true);
> >
> env.getStreamGraph().setSavepointRestoreSettings(savepointRestoreSettings);
> >      }   }*
> >    // for TimeCharacteristic validation in StreamTableEnvironmentImpl
> >
> >
> env.setStreamTimeCharacteristic(environment.getExecution().getTimeCharacteristic());
> >    if (env.getStreamTimeCharacteristic() ==
> TimeCharacteristic.EventTime) {
> >
> >
> env.getConfig().setAutoWatermarkInterval(environment.getExecution().getPeriodicWatermarksInterval());
> >    }
> >    return env;
> > }
> >
> >
> > 传入上面那个只有meta文件地址的时候报错如下:
> >
> > Exception in thread "main"
> > org.apache.flink.table.client.SqlClientException: Unexpected
> > exception. This is a bug. Please consider filing an issue.
> >         at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:213)
> > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> > Could not create execution context.
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:879)
> >         at
> >
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:227)
> >         at
> > org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> >         at
> org.apache.flink.table.client.SqlClient.main(SqlClient.java:201)
> > Caused by: java.lang.IllegalStateException: No operators defined in
> > streaming topology. Cannot execute.
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraphGenerator(StreamExecutionEnvironment.java:1870)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1861)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1846)
> >         at
> >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.getStreamGraph(StreamExecutionEnvironment.java:1834)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createStreamExecutionEnvironment(ExecutionContext.java:691)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.createTableEnvironment(ExecutionContext.java:593)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:498)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:184)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:137)
> >         at
> >
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:868)
> >         ... 3 more
> >
> >
> > 错误很明显的显示没有算子的state
> >
> >
> >
> >
> >
> >
> >
> >
> > Congxian Qiu <qcx978132...@gmail.com> 于2020年9月14日周一 下午7:53写道:
> >
> > > Hi
> > >    如果你的 state 都非常小的话,可能就会保存在 meta 文件中了,这样的话就只有 _metadata
> > > 这一个文件的。具体逻辑可以看一下这里[1]
> > >
> > > [1]
> > >
> > >
> >
> https://github.com/apache/flink/blob/9b0fb562898b809b860cf0065ded7a45c49300af/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/metadata/MetadataV2V3SerializerBase.java#L442
> > > Best,
> > > Congxian
> > >
> > >
> > > Harold.Miao <miaohong...@gmail.com> 于2020年9月14日周一 下午6:44写道:
> > >
> > > > hi  all
> > > >
> > > > flink 版本: 1.11.1
> > > >
> > > > 我们利用sql-client提交任务, flink-conf.yaml配置如下
> > > >
> > > > state.backend: filesystem
> > > > state.backend.fs.checkpointdir:
> > > >
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-data/23252
> > > > state.checkpoints.dir:
> > > >
> > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23252
> > > > state.savepoints.dir:
> > > > hdfs:///ai/flink/checkpoint/dataclean/hl-redis0902/savepoint/23252
> > > >
> > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > RETAIN_ON_CANCELLATION
> > > > execution.checkpointing.interval: 60s
> > > > execution.checkpointing.mode: EXACTLY_ONCE
> > > > jobmanager.execution.failover-strategy: full
> > > > state.backend.incremental: true
> > > >
> > > >
> > > > 任务运行后,在UI界面上看checkpoint都成功了。 但是hdfs上面却一直只有一个meta文件
> > > >
> > > > 类似下面:
> > > >
> > > > hdfs://
> > > >
> > >
> >
> 10.218.60.57:8020/ai/flink/checkpoint/dataclean/hl-redis0902/checkpoint-meta/23250/c72c1ee4362c3d0ba72db32698363fcf/chk-5/_metadata
> > > >
> > > > 除了这个文件,其他什么都没有。
> > > >
> > > > 我们的源是kafka,kafka肯定会保存state的。
> > > >
> > > >
> > > > 请教大家这是什么原因导致的呢
> > > >
> > > >
> > > > 谢谢
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > > --
> > > >
> > > > Best Regards,
> > > > Harold Miao
> > > >
> > >
> >
> >
> > --
> >
> > Best Regards,
> > Harold Miao
> >
>


-- 

Best Regards,
Harold Miao

回复