Re: Re: Re:回复:带有状态的算子保存checkpoint失败
checkpoint 失败了可以看看 是超时了,还是有 task snapshot 失败了,可以从 JM log 中来发现。超时的话,可以看下是数据量大需要时间久,还是 timeout 啥的设置太短;异常的话可以从对应的 tm log 看下为啥 snapshot 失败了 Best, Congxian 王默 于2020年11月27日周五 下午11:43写道: > checkpoint失败是在web页面上发现的,您看下截图https://imgchr.com/i/Dr3PNn > 看taskmanager日志确实没有超时,也没有其他异常 > > > > > > > > > > > > > > > > > > 在 2020-11-27 21:39:50,"赵一旦" 写道: > >失败原因也不写,怎么个不能保存。。。超时?还是啥。 > > > >魏积乾 于2020年11月27日周五 下午7:08写道: > > > >> flink-csv-1.11.2.jar > >> flink-dist_2.11-1.11.2.jar > >> flink-json-1.11.2.jar > >> flink-shaded-zookeeper-3.4.14.jar > >> flink-table_2.11-1.11.2.jar > >> flink-table-blink_2.11-1.11.2.jar > >> log4j-1.2-api-2.12.1.jar > >> log4j-api-2.12.1.jar > >> log4j-core-2.12.1.jar > >> log4j-slf4j-impl-2.12.1.jar > >> flink-metrics-prometheus_2.12-1.11.2.jar > >> > >> 按时间排了个序,这是最新的包。 > >> > >> > >> > >> 发自我的iPhone > >> > >> > >> -- 原始邮件 -- > >> 发件人: 王默 >> 发送时间: 2020年11月27日 18:41 > >> 收件人: user-zh harry...@foxmail.com > >> > >> 主题: 回复:Re:回复:带有状态的算子保存checkpoint失败 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-11-27 17:34:39,"魏积乾" >> > 我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 > >> 希望对你有帮助发自我的iPhone > -- > >> 原始邮件 -- 发件人: 王默 >> 发送时间: 2020年11月27日 17:22 收件人: user-zh gt; > >> 主题: 回复:带有状态的算子保存checkpoint失败 > >> > >> > >> > >> >
Re: Re:回复:带有状态的算子保存checkpoint失败
失败原因也不写,怎么个不能保存。。。超时?还是啥。 魏积乾 于2020年11月27日周五 下午7:08写道: > flink-csv-1.11.2.jar > flink-dist_2.11-1.11.2.jar > flink-json-1.11.2.jar > flink-shaded-zookeeper-3.4.14.jar > flink-table_2.11-1.11.2.jar > flink-table-blink_2.11-1.11.2.jar > log4j-1.2-api-2.12.1.jar > log4j-api-2.12.1.jar > log4j-core-2.12.1.jar > log4j-slf4j-impl-2.12.1.jar > flink-metrics-prometheus_2.12-1.11.2.jar > > 按时间排了个序,这是最新的包。 > > > > 发自我的iPhone > > > -- 原始邮件 -- > 发件人: 王默 发送时间: 2020年11月27日 18:41 > 收件人: user-zh > 主题: 回复:Re:回复:带有状态的算子保存checkpoint失败 > > > > > > > > > > > 请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下 > > > > > > > > > > > > > 在 2020-11-27 17:34:39,"魏积乾" 我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 > 希望对你有帮助发自我的iPhone -- > 原始邮件 -- 发件人: 王默 发送时间: 2020年11月27日 17:22 收件人: user-zh 主题: 回复:带有状态的算子保存checkpoint失败 > > > >
回复:Re:回复:带有状态的算子保存checkpoint失败
flink-csv-1.11.2.jar flink-dist_2.11-1.11.2.jar flink-json-1.11.2.jar flink-shaded-zookeeper-3.4.14.jar flink-table_2.11-1.11.2.jar flink-table-blink_2.11-1.11.2.jar log4j-1.2-api-2.12.1.jar log4j-api-2.12.1.jar log4j-core-2.12.1.jar log4j-slf4j-impl-2.12.1.jar flink-metrics-prometheus_2.12-1.11.2.jar 按时间排了个序,这是最新的包。 发自我的iPhone -- 原始邮件 -- 发件人: 王默
Re:回复:带有状态的算子保存checkpoint失败
请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下 在 2020-11-27 17:34:39,"魏积乾" 写道: >我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 >希望对你有帮助 > > > >发自我的iPhone > > >-- 原始邮件 -- >发件人: 王默 发送时间: 2020年11月27日 17:22 >收件人: user-zh 主题: 回复:带有状态的算子保存checkpoint失败
Re:回复:带有状态的算子保存checkpoint失败
感谢你提供的思路,配置文件已设置了state.checkpoints.dir,我检查一下是否有jar未升级 在 2020-11-27 17:34:39,"魏积乾" 写道: >我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 >希望对你有帮助 > > > >发自我的iPhone > > >-- 原始邮件 -- >发件人: 王默 发送时间: 2020年11月27日 17:22 >收件人: user-zh 主题: 回复:带有状态的算子保存checkpoint失败
带有状态的算子保存checkpoint失败
Hi,请教各位一个困扰了几天的问题, 我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败 StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs 且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息 使用的flink版本是1.11.2 web上checkpoint失败的截图,图片链接:https://imgchr.com/i/Dr3PNn 使用的是去掉业务逻辑后的简单测试代码 测试代码部分: public class TestStateProcess extends KeyedProcessFunction { private transient ValueState userCount; @Override public void open(Configuration parameters) throws Exception { try { ValueStateDescriptor descriptor = new ValueStateDescriptor("userId", TypeInformation.of(new TypeHint() {})); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.minutes(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); descriptor.enableTimeToLive(ttlConfig); userCount = getRuntimeContext().getState(descriptor); } catch (Exception e) { e.printStackTrace(); } } @Override public void processElement(NLMessage value, Context ctx, Collector out) throws Exception { try { if (null == userCount.value()) { userCount.update(1); } else { userCount.update(userCount.value() + 1); } if (userCount.value() > 10) { System.out.println(new Date() + " userId: " + ctx.getCurrentKey() + " count: " + userCount.value()); } out.collect(value); } catch (IOException e) { e.printStackTrace(); } } } checkpoint配置: env.setStateBackend(new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false)); env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60 * 1000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 万分感谢!
回复:带有状态的算子保存checkpoint失败
我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。 希望对你有帮助 发自我的iPhone -- 原始邮件 -- 发件人: 王默
带有状态的算子保存checkpoint失败
Hi,请教各位一个困扰了几天的问题, 我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败 StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs 且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息 使用的flink版本是1.11.2 附件为web上checkpoint失败的截图,使用的是去掉业务逻辑后的简单测试代码 测试代码部分: public class TestStateProcess extends KeyedProcessFunction { private transient ValueState userCount; @Override public void open(Configuration parameters) throws Exception { try { ValueStateDescriptor descriptor = new ValueStateDescriptor("userId", TypeInformation.of(new TypeHint() {})); StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(org.apache.flink.api.common.time.Time.minutes(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); descriptor.enableTimeToLive(ttlConfig); userCount = getRuntimeContext().getState(descriptor); } catch (Exception e) { e.printStackTrace(); } } @Override public void processElement(NLMessage value, Context ctx, Collector out) throws Exception { try { if (null == userCount.value()) { userCount.update(1); } else { userCount.update(userCount.value() + 1); } if (userCount.value() > 10) { System.out.println(new Date() + " userId: " + ctx.getCurrentKey() + " count: " + userCount.value()); } out.collect(value); } catch (IOException e) { e.printStackTrace(); } } } checkpoint配置: env.setStateBackend(new MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false)); env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60 * 1000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 万分感谢!