Re: Re: Re:回复:带有状态的算子保存checkpoint失败

2020-11-30 文章 Congxian Qiu
checkpoint 失败了可以看看 是超时了,还是有 task snapshot 失败了,可以从 JM log
中来发现。超时的话,可以看下是数据量大需要时间久,还是 timeout 啥的设置太短;异常的话可以从对应的 tm log 看下为啥 snapshot
失败了

Best,
Congxian


王默  于2020年11月27日周五 下午11:43写道:

> checkpoint失败是在web页面上发现的,您看下截图https://imgchr.com/i/Dr3PNn
> 看taskmanager日志确实没有超时,也没有其他异常
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-11-27 21:39:50,"赵一旦"  写道:
> >失败原因也不写,怎么个不能保存。。。超时?还是啥。
> >
> >魏积乾  于2020年11月27日周五 下午7:08写道:
> >
> >> flink-csv-1.11.2.jar
> >> flink-dist_2.11-1.11.2.jar
> >> flink-json-1.11.2.jar
> >> flink-shaded-zookeeper-3.4.14.jar
> >> flink-table_2.11-1.11.2.jar
> >> flink-table-blink_2.11-1.11.2.jar
> >> log4j-1.2-api-2.12.1.jar
> >> log4j-api-2.12.1.jar
> >> log4j-core-2.12.1.jar
> >> log4j-slf4j-impl-2.12.1.jar
> >> flink-metrics-prometheus_2.12-1.11.2.jar
> >>
> >> 按时间排了个序,这是最新的包。
> >>
> >>
> >>
> >> 发自我的iPhone
> >>
> >>
> >> -- 原始邮件 --
> >> 发件人: 王默  >> 发送时间: 2020年11月27日 18:41
> >> 收件人: user-zh  harry...@foxmail.com
> >> 
> >> 主题: 回复:Re:回复:带有状态的算子保存checkpoint失败
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-11-27 17:34:39,"魏积乾"  >>
> 我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
> >> 希望对你有帮助发自我的iPhone  
> --
> >> 原始邮件 -- 发件人: 王默  >> 发送时间: 2020年11月27日 17:22 收件人: user-zh  gt;
> >> 主题: 回复:带有状态的算子保存checkpoint失败
> >>
> >>
> >>
> >> 
>


Re: Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 文章 赵一旦
失败原因也不写,怎么个不能保存。。。超时?还是啥。

魏积乾  于2020年11月27日周五 下午7:08写道:

> flink-csv-1.11.2.jar
> flink-dist_2.11-1.11.2.jar
> flink-json-1.11.2.jar
> flink-shaded-zookeeper-3.4.14.jar
> flink-table_2.11-1.11.2.jar
> flink-table-blink_2.11-1.11.2.jar
> log4j-1.2-api-2.12.1.jar
> log4j-api-2.12.1.jar
> log4j-core-2.12.1.jar
> log4j-slf4j-impl-2.12.1.jar
> flink-metrics-prometheus_2.12-1.11.2.jar
>
> 按时间排了个序,这是最新的包。
>
>
>
> 发自我的iPhone
>
>
> -- 原始邮件 --
> 发件人: 王默  发送时间: 2020年11月27日 18:41
> 收件人: user-zh  
> 主题: 回复:Re:回复:带有状态的算子保存checkpoint失败
>
>
>
>
>
>
>
>
>
>
> 请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-11-27 17:34:39,"魏积乾"  我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
> 希望对你有帮助发自我的iPhone   --
> 原始邮件 -- 发件人: 王默  发送时间: 2020年11月27日 17:22 收件人: user-zh  主题: 回复:带有状态的算子保存checkpoint失败
>
>
>
> 


回复:Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 文章 魏积乾
flink-csv-1.11.2.jar
flink-dist_2.11-1.11.2.jar
flink-json-1.11.2.jar
flink-shaded-zookeeper-3.4.14.jar
flink-table_2.11-1.11.2.jar
flink-table-blink_2.11-1.11.2.jar
log4j-1.2-api-2.12.1.jar
log4j-api-2.12.1.jar
log4j-core-2.12.1.jar
log4j-slf4j-impl-2.12.1.jar
flink-metrics-prometheus_2.12-1.11.2.jar

按时间排了个序,这是最新的包。



发自我的iPhone


-- 原始邮件 --
发件人: 王默 

Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 文章 王默






请问能具体告知是哪个包没升级吗?或者是否有什么包需要从opt拷贝到lib下











在 2020-11-27 17:34:39,"魏积乾"  写道:
>我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
>希望对你有帮助
>
>
>
>发自我的iPhone
>
>
>-- 原始邮件 --
>发件人: 王默 发送时间: 2020年11月27日 17:22
>收件人: user-zh 主题: 回复:带有状态的算子保存checkpoint失败


Re:回复:带有状态的算子保存checkpoint失败

2020-11-27 文章 王默












感谢你提供的思路,配置文件已设置了state.checkpoints.dir,我检查一下是否有jar未升级





在 2020-11-27 17:34:39,"魏积乾"  写道:
>我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
>希望对你有帮助
>
>
>
>发自我的iPhone
>
>
>-- 原始邮件 --
>发件人: 王默 发送时间: 2020年11月27日 17:22
>收件人: user-zh 主题: 回复:带有状态的算子保存checkpoint失败


带有状态的算子保存checkpoint失败

2020-11-27 文章 王默
Hi,请教各位一个困扰了几天的问题,
我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败
StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs
且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息
使用的flink版本是1.11.2


web上checkpoint失败的截图,图片链接:https://imgchr.com/i/Dr3PNn
使用的是去掉业务逻辑后的简单测试代码


测试代码部分:
public class TestStateProcess extends KeyedProcessFunction {
private transient ValueState userCount;


@Override
public void open(Configuration parameters) throws Exception {
try {
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("userId", TypeInformation.of(new 
TypeHint() {}));


StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(org.apache.flink.api.common.time.Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);


userCount = getRuntimeContext().getState(descriptor);
} catch (Exception e) {
e.printStackTrace();
}
}


@Override
public void processElement(NLMessage value, Context ctx, 
Collector out) throws Exception {
try {
if (null == userCount.value()) {
userCount.update(1);


} else {
userCount.update(userCount.value() + 1);
}


if (userCount.value() > 10) {
System.out.println(new Date() + " userId: " + 
ctx.getCurrentKey() + " count: " + userCount.value());
}


out.collect(value);
} catch (IOException e) {
e.printStackTrace();
}
}
}


checkpoint配置:
env.setStateBackend(new 
MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false));
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


万分感谢!




 

回复:带有状态的算子保存checkpoint失败

2020-11-27 文章 魏积乾
我也刚遇到过,是从1.10升级上来的,一个任务可以保存checkpoint,一个任务老是保存失败,然后查看了lib下面的jar,发现有些jar没有升级上来,于是更改了,配置文件还改了state.checkpoints.dir。
希望对你有帮助



发自我的iPhone


-- 原始邮件 --
发件人: 王默 

带有状态的算子保存checkpoint失败

2020-11-27 文章 王默
Hi,请教各位一个困扰了几天的问题,
我在项目中使用了状态保存一些数据用于去重,开启checkpoint后在web上发现带有状态算子无法保存状态数据到checkpoint,导致整个checkpoint提交失败,偶尔第一次能成功提交checkpoint,但后续提交全部失败
StateBackend试过MemoryStateBackend和FsStateBackend都不行,FsStateBackend使用的是hdfs
且根据jobid到对应taskmanager下的日志中没有发现任何相关的异常信息
使用的flink版本是1.11.2


附件为web上checkpoint失败的截图,使用的是去掉业务逻辑后的简单测试代码


测试代码部分:
public class TestStateProcess extends KeyedProcessFunction {
private transient ValueState userCount;


@Override
public void open(Configuration parameters) throws Exception {
try {
ValueStateDescriptor descriptor = new 
ValueStateDescriptor("userId", TypeInformation.of(new 
TypeHint() {}));


StateTtlConfig ttlConfig = StateTtlConfig

.newBuilder(org.apache.flink.api.common.time.Time.minutes(10))
.setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)

.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build();
descriptor.enableTimeToLive(ttlConfig);


userCount = getRuntimeContext().getState(descriptor);
} catch (Exception e) {
e.printStackTrace();
}
}


@Override
public void processElement(NLMessage value, Context ctx, 
Collector out) throws Exception {
try {
if (null == userCount.value()) {
userCount.update(1);


} else {
userCount.update(userCount.value() + 1);
}


if (userCount.value() > 10) {
System.out.println(new Date() + " userId: " + 
ctx.getCurrentKey() + " count: " + userCount.value());
}


out.collect(value);
} catch (IOException e) {
e.printStackTrace();
}
}
}


checkpoint配置:
env.setStateBackend(new 
MemoryStateBackend(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE, false));
env.enableCheckpointing(60 * 1000, CheckpointingMode.EXACTLY_ONCE);
env.getCheckpointConfig().setCheckpointTimeout(60 * 1000);

env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);


万分感谢!