Hi
从代码暂时没有看出问题,不确定 迭代 作业的 checkpoint 是否有特殊的地方,我抄送了一个对迭代这块更了解的人(Yun
Gao),或许他在这块有一些建议
Best,
Congxian
Yun Tang 于2020年8月27日周四 下午5:10写道:
> Hi Robert
>
> 你的两个source
> firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint
> barrier并没有下发。
> 建议使用jstack查看在没有数据下发时,so
Hi Robert
你的两个source
firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint
barrier并没有下发。
建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放
[1]
https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink
UV??MapStateBloomFilter??,checkpoint??bloomMapState
hihu.com/p/87131964
Best,
Congxian
Robert.Zhang <173603...@qq.comgt; ??2020??8??21??
6:31??
gt; Hello all,
gt; iterative stream job
gt;
????checkpoint??????checkpoint
gt; state
Hi
对于开启 Checkpoint
之后导致反压的情况,如果希望在现在的基础上进行优化的话,则需要找到反压的原因是什么,可以尝试从最后一个被反压的算子开始排查,到底什么原因导致的,排查过程中,或许
Arthas[1] 可以有一些帮助
另外比较好奇的是,为什么反压会导致你的作业挂掉呢?作业挂掉的原因是啥呢
[1] https://github.com/alibaba/arthas
Best,
Congxian
Yun Tang 于2020年8月26日周三 上午11:25写道:
> Hi
>
> 对于已经改为at least once的chec
Hi
按理说,数据和 barrier 没有依赖关系的,但从你的描述看,没有数据的时候,无法接受到
barrier,或许你可以分享一下你的代码(可以把业务逻辑进行简化),或许大家可以帮你看看
Best,
Congxian
Robert.Zhang <173603...@qq.com> 于2020年8月26日周三 上午11:43写道:
> Hi Congxian,
>
> 开了更多日志观察了下,是由于iteration source的barrier无法接收到导致的。
> 该barrier无法接收到,导致下游也无法拿到由该operator传递的ba
Hi Congxian,
iteration source??barrier??
??barrier??operator??barrier??checkpoint??
??operator
Hi
对于已经改为at least once的checkpoint,其在checkpoint时对于作业吞吐的影响只有task
同步阶段的snapshot,这个时间段的snapshot由于与task的主线程的数据访问持有同一把锁,会影响主线程的数据处理。但是就算这样,我也很怀疑checkpoint本身并不是导致早上10点高峰期无法运行的罪魁祸首。
使用异步的,支持增量的state backend (例如RocksDBStateBackend)会大大缓解该问题。
建议排查思路:
1. 检查使用的state backend类型
2. 检查是否存在sync阶段checkpoint
Hi zhanglachun,
你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢
Best,
LakeShen
徐骁 于2020年8月26日周三 上午2:10写道:
> input
> .keyBy()
> .timeWindow()
> .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
>
> 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可
input
.keyBy()
.timeWindow()
.reduce(new MyReduceFunction(), new MyProcessWindowFunction());
可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
window 里面
大佬们好,我一个flink任务,计算一分钟内的某项几项指标的中位数,总共5个指标,因为中位数计算需要全窗口数据排序,所以计算比较复杂,现在遇到的问题的是一旦开启checkpoint任务就从source端开始反压严重,但关闭checkpoint就正常运行.
目前优化的步骤有:
1.语义放弃exactlyonce 改到atleast
2.分析发现keyby过程中有数据倾斜,已改成分布聚合,在第一轮聚合中key后添加随机数,在去除key后缀进行第二轮聚合
3.计算过程中使用RoaringBitmap作为中间数据缓存容器,最大限度减少内存损耗
4.增大并行度,提交时增大-yjm -ytm 内存配
Hi Congxian,
测试的时候数据量是很小的,cpu使用比较低的,让我比较奇怪的一点是我杀掉任务重启的话,有时候能成功进行checkpoint,看了下日志,就是这个iteration
source成功执行了snapshot,发起了barrier,进而影响到后续operator的checkpoint。失败的时候是该source无法snapshot,直至超时。
因为flink这一块,iteration是由head tail组成,是一个比较特殊的stream
task,目前还没有看到jm这边是如何对此进行处理的。这个iteration source其实是由其他source
Hi
对于 checkpoint 超时失败的情况,需要看一下具体的原因,对于 source 没有完成的话,或许看一下相应并发(没完成 snapshot
的 source)的 CPU 占用情况,以及相应逻辑是否卡在哪里或许能看到一些线索。source 是收到 JM 的 rpc 后触发的
snapshot,所以这里相比其他的算子,不需要考虑 barrier 对齐的事情。
Best,
Congxian
Robert.Zhang <173603...@qq.com> 于2020年8月25日周二 上午12:58写道:
> 看了日志,是由于部分checkpoint 超时未
看了日志,是由于部分checkpoint 超时未完成,web界面上 iteration source的checkpoint始终无法完成。
官方文档对于在iterative
stream的checkpoint没有更详细的说明。对于loop中的数据丢失可以理解。但是checkpoint无法成功不是特别能理解。
按照我对于chandylamport算法的理解,上游operator的barrier应该是直接给到了下游
,不应该存在无法拿到barrier的情况才对。不知道这是什么原因导致的
---原始邮件---
发件人: "Congxian Qiu&
Hi
从报错 ”Exceeded checkpoint tolerable failure threshold“ 看,你的 checkpoint
有持续报错,导致了作业失败,你需要找一下为什么 checkpoint 失败,或许这篇文章[1] 可以有一些帮助
另外从配置看,你开启了 unalign checkpoint,这个是上述文章中暂时没有设计的地方。
[1] https://zhuanlan.zhihu.com/p/87131964
Best,
Congxian
Robert.Zhang <173603...@qq.com> 于2020年8月21日周五 下
Hello all,
iterative stream job
checkpoint??checkpoint
state
k??org.apache.flink.util.FlinkRuntimeException:
Exceeded checkpoint tolerable failure threshold
Hi
如果你的算子有改变的话,想从 checkpoint/savepoint 恢复,需要添加
`--allowNonRestoredState`,这样可以忽略掉那些不在新
job 中的算子(就算逻辑一样,uid 不一样也会被忽略掉的),具体的可以参考文档[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#what-happens-if-i-delete-an-operator-that-has-state-from-my-job
Best
hi
可以参考这篇文章https://mp.weixin.qq.com/s/Vl6_GsGeG0dK84p9H2Ld0Q
在cancel的时候触发一个savepoint 修改完SQL从savepoint恢复任务
--
Sent from: http://apache-flink.147419.n8.nabble.com/
hi,大家好,我遇到个问题,执行一个sql,作了checkpoint,现在sql中加了where条件,但是insert select
的值都没变,想指定之前的checkpoint继续执行,但是会报错,cannot map checkpoint state for operator ***
to the new programe 是不是因为sql改变了,算子id就变了找不到了,这个有什么办法吗
这个问题和下面这个问题[1] 重复了,在另外的邮件列表中已经有相关讨论
[1]
http://apache-flink.147419.n8.nabble.com/Flink-FINISHED-Checkpoint-td6008.html
Best,
Congxian
yulu yang 于2020年8月14日周五 下午1:05写道:
> 对了,我这个flink作业和和分组都是新创建,不存在抽取历史。
>
> 杨豫鲁 于2020年8月13日周四 下午3:33写道:
>
> > 请教大家一个我最近在配置Flink流的过程中遇到问题,
> &
---
> 发件人:
> "user-zh"
> <
> yj5...@gmail.com;
> 发送时间:2020年8月13日(星期四) 中午1:49
> 收件人:"user-zh"
> 主题:请教关于Flink算子FINISHED状态时无法保存Checkpoint的问
??
redisredis??State
----
??:
对了,我这个flink作业和和分组都是新创建,不存在抽取历史。
杨豫鲁 于2020年8月13日周四 下午3:33写道:
> 请教大家一个我最近在配置Flink流的过程中遇到问题,
>
> flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
>
>
>
>
>
请教大家一个我最近在配置Flink流的过程中遇到问题,
flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
Congxian
>
>
> Congxian Qiu 于2020年8月13日周四 下午2:00写道:
>
> > Hi
> > 现在 checkpoint/savepoint 需要所有算子都处于 RUNNING 状态,不过社区也有一些 issue
> > 希望能够优化这个问题[1][2]
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-2491
> > [2] https://issues.apache.org/jira/b
Hi
不好意思,上一份邮件没有说完就发送出去了。
如果你希望把从其他地方读入 字典表,然后在 flink 中使用,或许可以看看 broadcast state[1]
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/state/broadcast_state.html
Best,
Congxian
Congxian Qiu 于2020年8月13日周四 下午2:00写道:
> Hi
> 现在 checkpoint/savepoint 需要所
Hi
现在 checkpoint/savepoint 需要所有算子都处于 RUNNING 状态,不过社区也有一些 issue
希望能够优化这个问题[1][2]
[1] https://issues.apache.org/jira/browse/FLINK-2491
[2] https://issues.apache.org/jira/browse/FLINK-18263
Best,
Congxian
yulu yang 于2020年8月13日周四 下午1:49写道:
> 请教大佬一个我最近在配置Flink流的过程中遇到问题,
>
> flink作业中关联
请教大佬一个我最近在配置Flink流的过程中遇到问题,
flink作业中关联使用了物理表(字典表),在flinkjob启动后,会对字典表进行一次读取,然后该算子会变成FINISHED状态,导致该flinkjob无法保存checkpoint和savepoint。一般大家遇到这种问题都是怎么处理的,我这个作业在数据加工过程中必须用到字典表赋值。
ate */ @Override
> protected void commit(ConnectionState connectionState) {
>log.info("start commit..." + connectionState);
> Connection connection =
> connectionState.connection; try {
> connection.commit();
> connection.close();
> } catch
:37:38,391 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null - SinkConversionToRow (1
您好,这里有个问题反馈下!
读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
没有抛任何异常但是checkpoint失败:
job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED
instead. Aborting checkpoint.
附件
1.flink.log是yarn jobmanager打印的伪日志
2.Job.txt是job的伪代码
3.jdbc两阶段提交的伪代码附件
发自我的iPhone
Checkpoint只生成了shared和taskowned目录,没有chk,望解答,谢谢
| |
king
|
|
邮箱:kingjinhe2...@163.com
|
Signature is customized by Netease Mail Master
- 转发的邮件 -
发件人: king
发送日期: 2020年08月07日 09:05
收件人: user-zh
主题: 转发:Sql-client的checkpoint问题
抱歉,不是flink-site.yaml是flink-conf.yaml
| |
king
|
|
邮箱
抱歉,不是flink-site.yaml是flink-conf.yaml
| |
king
|
|
邮箱:kingjinhe2...@163.com
|
Signature is customized by Netease Mail Master
- 转发的邮件 -
发件人: king
发送日期: 2020年08月07日 08:23
收件人: user-zh
主题: Sql-client的checkpoint问题
您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file
您好,flink1.11.0,请问,
1.sql-client 如何设置checkpoint时间(生成周期),在做file
streaming时候hdfs文件一直In-progress处状态,不能Finalized
2.之前在flink-site.yaml文件中设置了checkpoint,systemfile类型,只生成了另外两个目录,没有生成chk,望解答,谢谢
以上问题在编程方式下无问题。
| |
king
|
|
邮箱:kingjinhe2...@163.com
|
Signature is customized by Netease Mail Master
HI
目前sql-client的方式应该还不支持从指定的checkpoint恢复任务 不过Flink on zeppelin目前已经支持了 有兴趣可以用下
| |
JasonLee
|
|
邮箱:17610775...@163.com
|
Signature is customized by Netease Mail Master
在2020年08月04日 16:28,mispower 写道:
通过sql_client 启动的streaming的任务,在维护或者异常之后,如何像flink straming 一样通过指定checkpoint
恢复到上一次的消费节点。
在邮件列表里搜
通过sql_client 启动的streaming的任务,在维护或者异常之后,如何像flink straming 一样通过指定checkpoint
恢复到上一次的消费节点。
在邮件列表里搜索了相关的问题,好像都没有明确回答。
似乎楼主一开始说的checkpoint成功是指source
算子的checkpoint成功?但notifyCheckpointComplete函数要求的是整个链路的chk成功。
这个时候offset为100的消息必然已经被sink算子处理完成了,因为触发chk的屏障消息必然在offset100的消息之后到达sink算子。
hk__lrzy 于2020年7月29日周三 下午5:53写道:
> 你是说emit之后的offset commit么?可以看下
> `Kafka09Fetcher`的runFetchLoop方法
>
>
> 在2020年07月2
你是说emit之后的offset commit么?可以看下
`Kafka09Fetcher`的runFetchLoop方法
在2020年07月29日 15:09,shuwen zhou 写道:
比如读到一条offset值为100的消息,有Kafka Consumer source,一个Process算子和一个Sink算子
那么在这条消息到达Process算子,还未到达Sink算子时,提交offset是100吗?
On Wed, 29 Jul 2020 at 14:51, venn wrote:
checkpoint成功时就会把它的offset提交,可以看下这个类
可以这样理解,实际上souce 算子只知道这条数据发出去了,不知道这条数据执行到哪里的
-Original Message-
From: user-zh-return-5981-wxchunjhyy=163@flink.apache.org
On Behalf Of shuwen
zhou
Sent: 2020年7月29日 15:10
To: user-zh@flink.apache.org
Subject: Re: Flink使用Kafka作为source时checkpoint成功提交offset的机制
比如读到一条offset值为100的消息,有
比如读到一条offset值为100的消息,有Kafka Consumer source,一个Process算子和一个Sink算子
那么在这条消息到达Process算子,还未到达Sink算子时,提交offset是100吗?
On Wed, 29 Jul 2020 at 14:51, venn wrote:
> checkpoint成功时就会把它的offset提交,可以看下这个类: FlinkKafkaConsumerBase 的 这个方法:
> notifyCheckpointComplete
>
> -Original Message-
checkpoint成功时就会把它的offset提交,可以看下这个类: FlinkKafkaConsumerBase 的 这个方法:
notifyCheckpointComplete
-Original Message-
From: user-zh-return-5976-wxchunjhyy=163@flink.apache.org
On Behalf Of shuwen
zhou
Sent: 2020年7月29日 14:24
To: user-zh@flink.apache.org
Subject: Flink使用Kafka作为source时
大家好,请教一个问题,
当Flink在使用Kafka作为source,并且checkpoint开启时,提交的offset是在消息被链路最后一个算子处理之后才会把这条offset提交吗?
还是说这个消息只要从Kafka source读取后,不论链路处理到哪个算子, checkpoint成功时就会把它的offset提交呢?
另外有大神指路这段代码具体在哪个文件吗?
谢谢!
--
Best Wishes,
Shuwen Zhou
gt;
>>> >> > Peihui He 于2020年7月16日周四 下午5:26写道:
>>> >> >
>>> >> >> Hi Yun,
>>> >> >>
>>> >> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>>> >> >>
>>> >> >> Best wishes.
>>> >> >>
>>
t; >
>> >> >> Hi Yun,
>> >> >>
>> >> >> 作业没有开启local recovery, 我这边测试1.10.0是必现的。
>> >> >>
>> >> >> Best wishes.
>> >> >>
>> >> >> Yun Tang 于2020年7月16日周四 下午5:04写道:
>> >> &g
Hello,
flink 1.10.1
kafka 2.12-1.1.0
运行一段时间后会出现一下错误,不知道有遇到过没?
java.lang.RuntimeException: Error while confirming checkpoint
at
org.apache.flink.streaming.runtime.tasks.StreamTask.notifyCheckpointComplete(StreamTask.java:935
Hi
Checkpoint 包括两部分:1)meta 文件;2)具体的数据。如果是 Meta 部分可以参考
CheckpointMetadataLoadingTest[1] 自己写一个测试,如果你知道具体的内容,或许也可以看一下
StatePorcessAPI[2]
[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointMetadataLoadingTest.java
[2]
https
请问怎么反编译checkpoint文件啊,我想知道state写到checkpoint文件没有
_default_
OPERATOR_STATE_DISTRIBUTION_MODE SPLIT_DISTRIBUTE
VALUE_SERIALIZER
Gorg.apache.flink.api.common.typeutils.ParameterlessTypeSerializerConfigzS酿
脂?sr -org.apache.flink.runtime.state.JavaSerializerFSX韦4
? xr
??
??Flink
??kafka??checkpointEXACTLY_ONCE
??
Producer attempted an operation with an old epoch.Either there is a newer
producer with the same transactionalId, or the producer's transaction has been
expired by the broker
??
??
JM checkpoint ??
18:08:07.615 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 116 @ 1595239687615 for job acd456ff6f2f9f59ee89b126503c20f0.
18:08:07.628 [flink-akka.actor.default-dispatcher-420] INFO
;> >>
>> >> --
>> >> Best, yuchuan
>> >>
>> >>
>> >>
>> >> 在 2020-07-06 14:04:58,"Congxian Qiu" 写道:
>> >> >@陈凯 感谢你分享的这个方法,比较好奇这两个的区别是什么?修改后的 patch 在 closure 中一开始 copy 了一份
>> >> &g
hi
UI??checkpoint??checkpoint??
??
| |
JasonLee
|
|
??17610775...@163.com
|
Signature is customized by Netease Mail Master
??2020??07??17?? 17:21??sun ??
??counts ?? Listhttps://ci.apache.org/projects
Hi
1 你需要回复一下我之前问你的问题:你可以从 JM log 看一下是否从 checkpoint 恢复了
2. 这里没有打印只是表明当前处理的 key 没有 state 数据,并不能表示 state 没有恢复回来,state 值是绑定到某个 key
上的(keyby 的 key)
Best,
Congxian
sun <1392427...@qq.com> 于2020年7月17日周五 下午5:22写道:
> 你好:counts 的数据 我是在下面打印出来了 List Lists.newArrayList(c
Hi, 曹武
这是一个已知bug,这个在1.11.1和1.12.0里已经修复,
如果着急使用,可以自己编译下release-1.11分支。
祝好
Leonard Xu
https://issues.apache.org/jira/browse/FLINK-18461
<https://issues.apache.org/jira/browse/FLINK-18461>
> 在 2020年7月17日,17:12,曹武 <14701319...@163.com> 写道:
>
> 感觉好像是应为从checkpoint启动失败或者是chec
??counts ?? Listhttps://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best,
Congxian
sun <1392427...@qq.com ??2020??7??16?? 6:16??
感觉好像是应为从checkpoint启动失败或者是checkpiont文件里面不包含groupby的中间结果,这个怎么排查呀!
godfrey he wrote
> 为什么要 GROUP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM
> debezium_source" 不能满足需求?
>
> 曹武 <
> 14701319164@
>> 于2020年7月16
UP BY id,name ,description, weight ?
> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM
> debezium_source" 不能满足需求?
>
> 曹武 <
> 14701319164@
>> 于2020年7月16日周四 下午9:30写道:
>
>> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
>> 从checkpoint恢复以后
rote:
>
>> 为什么要 GROUP BY id,name ,description, weight ?
>> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM
>> debezium_source" 不能满足需求?
>>
>> 曹武 <
> 14701319164@
>> 于2020年7月16日周四 下午9:30写道:
>>
>> > 我在使用flink 1.11.0
t; >>> Hi Peihui
> >> >>>
> >> >>> Flink-1.10.1
> >> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
> >> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
> >> >>
gt; >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
>> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
>> >>>
>> >>>
>> >>> [1]
>> >>>
>> https://github.com/apache/flink/c
ght ?
> 直接 "INSERT INTO sink SELECT id,name ,description, weight FROM
> debezium_source" 不能满足需求?
>
> 曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:
>
> > 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> > 从checkpoint恢复以后,新来op=d的数据会删除失败
> > 重启命令:./bi
gt; >>> Flink-1.10.1
> >>> 里面涉及到相关代码的改动就是更改了restore时path的类[1],但是你们的操作系统并不是windows,按道理应该是没有关系的。
> >>> 另外,这个问题在你遇到failover时候是必现的么?从文件路径看,作业也没有开启local recovery是吧?
> >>>
> >>>
> >>> [1]
> >>>
> https://github.com/
为什么要 GROUP BY id,name ,description, weight ?
直接 "INSERT INTO sink SELECT id,name ,description, weight FROM
debezium_source" 不能满足需求?
曹武 <14701319...@163.com> 于2020年7月16日周四 下午9:30写道:
> 我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
> 从checkpoint恢复以后,新来op=d的数据会删除失败
> 重启命
我在使用flink 1.11.0中得ddl 部分 采用debezium-json做cdc得时候
从checkpoint恢复以后,新来op=d的数据会删除失败
重启命令:./bin/flink run -m yarn-cluster /root/bigdata-flink-1.0.jar -s
hdfs://prehadoop01:8020/flink/checkpoints/4cc5df8b96e90c1c2a4d3719a77f51d1/chk-819/_metadata
代码: EnvironmentSettings settings
;> https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>>> 祝好
>>> 唐云
>>> ____
>>> From: Peihui He
>>> Sent: Thursday, July 16, 2020 16:15
>>> To: user-zh@flink.apache.org
>>>
Hi
1 counts 的数据丢失了能否详细描述一下呢?你预期是什么,看到什么现象
2 能否把你关于 counts 的其他代码也贴一下
3. 你的作业是否从 checkpoint 恢复了呢?这个可以从 JM log 来查看
4. 如果你确定是数据有丢失的话,或许你可以使用 state-process-api[1] 看一下是序列化出去有问题,还是 restore 回来有问题
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html
Best
env.enableCheckpointing(1000);env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//
env.setRestartStrategy(RestartStrategies.noRestart());
env.getCheckpointConfig().setCheckpointTimeout(500);
b.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
>> 祝好
>> 唐云
>>
>> From: Peihui He
>> Sent: Thursday, July 16, 2020 16:15
>> To: user-zh@flink.apache.org
>> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint
; https://github.com/apache/flink/commit/399329275e5e2baca9ed9494cce97ff732ac077a
> 祝好
> 唐云
>
> From: Peihui He
> Sent: Thursday, July 16, 2020 16:15
> To: user-zh@flink.apache.org
> Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
>
: Thursday, July 16, 2020 16:15
To: user-zh@flink.apache.org
Subject: Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
Hi Yun,
不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
输入的特定的word抛出runtimeexception 使task
失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
Caused by: java.nio.file.NoSuchFileException:
/data
Hi Yun,
不好意思这么久回复,是@Congxian 描述的第2种情况。异常就是我通过socket
输入的特定的word抛出runtimeexception 使task
失败,然后job会尝试从checkpoint中恢复,但是恢复的过程中就报
Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3
Hi chenxyz,
我们遇到的问题应该是一样的,换了1.10.1 后就可以从checkpoint 中恢复了。珞
Best wishes.
chenxyz 于2020年7月15日周三 下午9:53写道:
>
>
>
> Hello,
> Peihui,可以参考下是不是和这个问题类似?之前我在1.10.0也遇到过。
>
> http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOpera
:
> Hi
>
> 我尝试理解一下:
> 1 你用 1.9 跑 wordcount 作业,然后执行了一些 checkpoint,然后停止作业,然后使用 1.10 从之前 1.9 的作业生成的
> checkpoint 恢复,发现恢复不了?
> 2 你用作业 1.10 跑 wordcount,然后遇到特定的 word 会抛异常,然后 failover,发现不能从 checkpoint 恢复?
>
> 你这里的问题是第 1 种还是第 2 种呢?
>
> 另外能否分享一下你的操作步骤以及出错时候的 taskmanag
道:
>Hi Yun,
>
>我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
>print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
>里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
>yarn。
>
>Best wishes.
>
>Yun Tang 于2020年7月14日周二 上午11:57写道
据我所知,跨大版本的不能直接从checkoint恢复,只能放弃状态重新跑
Best
Robin Zhang
From: Peihui He <[hidden email]>
Sent: Tuesday, July 14, 2020 10:42
To: [hidden email] <[hidden email]>
Subject: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
hello,
当升级到1.10.0 时候,程序出错后会尝试从che
Hi
我尝试理解一下:
1 你用 1.9 跑 wordcount 作业,然后执行了一些 checkpoint,然后停止作业,然后使用 1.10 从之前 1.9 的作业生成的
checkpoint 恢复,发现恢复不了?
2 你用作业 1.10 跑 wordcount,然后遇到特定的 word 会抛异常,然后 failover,发现不能从 checkpoint 恢复?
你这里的问题是第 1 种还是第 2 种呢?
另外能否分享一下你的操作步骤以及出错时候的 taskmanager log 呢?
Best,
Congxian
Peihui He 于2020年7月14日周二 下午2
Hi Congxian,
这个错误是从1.9 升级到1.10 遇到的问题。用简单的wordcount 测试,自己根据特定word
抛出runtimeException,就能够重现。flink on yarn 和 flink on k8s 都出现这个问题。1.10
都不能从上次的checkpoint状态中恢复。不知道是不是1.10需要其他配置呢?
Best wishes.
Congxian Qiu 于2020年7月14日周二 下午1:54写道:
> Hi
>
> 这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 fai
Hi Yun,
我这边用一个word count 例子,socket -> flatmap -> keyBy -> reduce ->
print. 在flatmap 中当出现特定word的时候就抛出一个runtimeException。在1.9.2
里面是可以从checkpoint中自动恢复上次做checkpoint的时候的状态,但是用1.10.0 就不能。环境是flink on
yarn。
Best wishes.
Yun Tang 于2020年7月14日周二 上午11:57写道:
> Hi Peihui
>
> 你的异常应该是从增量
Hi
这个出错是从 1.9 升级到 1.10 遇到的问题,还是说 1.10 能正常跑了,然后跑着跑着 failover 了再次恢复的时候出错了呢?
另外你可以看下 tm log 看看有没有其他异常
Best,
Congxian
Yun Tang 于2020年7月14日周二 上午11:57写道:
> Hi Peihui
>
> 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
> cause。
>
Hi Peihui
你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root
cause。
[1]
https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming
hello,
当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
Caused by: java.nio.file.NoSuchFileException:
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3
会变为 null,这个比较奇怪。
> >> >
> >> >Best,
> >> >Congxian
> >> >
> >> >
> >> >陈凯 于2020年7月6日周一 上午9:53写道:
> >> >
> >> >>
> >> >> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
> >>
目前来看,平稳运行,没有再报出类似错误。
>
> 反馈一下,如果有朋友遇到类似的问题,可以参考,给这个问题做一个闭环。谢谢各位的关注和帮忙。
>
> Best,
> Zhefu
>
> LakeShen 于2020年6月12日周五 上午9:49写道:
>
> > Hi ZheFu,
> >
> > 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
> > 的数据是否都已经 Sink 到了 kafka.
&g
时候,会检查该次 Sink
>> 的数据是否都已经 Sink 到了 kafka.
>>
>> 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
>> 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
>>
>> 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
>>
>> Best,
>> LakeShen
>>
>
eFu,
>
> 可以把你的 Flink 版本说一下,我大致理解是这样的,每次 sink 端 在 snapshotState 的时候,会检查该次 Sink
> 的数据是否都已经 Sink 到了 kafka.
>
> 也就是说,你这次 Checkpoint 的时候,由于你的 Checkpoint 间隔较短,Kafka 那边给回的消息记录 Ack
> 还没有弄完,所以有这个问题。建议 Checkpoint 间隔弄长点。
>
> 具体代码查看:FlinkKafkaProducerBase.snapshotState 这个方法。
>
> Bes
,这个比较奇怪。
>> >
>> >Best,
>> >Congxian
>> >
>> >
>> >陈凯 于2020年7月6日周一 上午9:53写道:
>> >
>> >>
>> >> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
>> >> 我之前提了个jira 描述了这个问题
>> >> h
>CheckpointMeta,也就是说 845 - 867 行之间,之前的 checkpointMeta 会变为 null,这个比较奇怪。
> >
> >Best,
> >Congxian
> >
> >
> >陈凯 于2020年7月6日周一 上午9:53写道:
> >
> >>
> >> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
> >> 我之前提了个jira 描
:53写道:
>
>>
>> Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
>> 我之前提了个jira 描述了这个问题
>> https://issues.apache.org/jira/browse/FLINK-18196
>>
>> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
>>
>> https://g
che.org/jira/browse/FLINK-18196
>
> 修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
>
> https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
>
>
>
> -邮件原件-
> 发件人: zhisheng
> 发送时间: 2020年7月5日 15:01
> 收件人: u
Hi,zhisheng 程龙.我们也遇到这个问题了,jdk版本jdk8_40,低版本 jdk 确实有大概率会NPE。
我之前提了个jira 描述了这个问题
https://issues.apache.org/jira/browse/FLINK-18196
修改了Checkpoint 相关代码后,在低版本 jdk 上也没有再发现过过NPE。如果实在不能升级 jdk 版本,可以参考下面的patch:
https://github.com/yuchuanchen/flink/commit/e5122d9787be1fee9bce141887e0d70c9b0a4f19
-邮件
> > Congxian
> > >
> > >
> > > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
> > > >
>
你好
对于 Checkpoint Async 阶段比较慢的情况,你可以看一下 网络的情况,以及 HDFS 的读写情况,-- 包括 NN 的压力等。
Best,
Congxian
zhisheng 于2020年6月28日周日 上午10:31写道:
> hi,立志:
>
> 从你的描述(能跑 10 几天且使用的是 FsStateBackend),可以提供一下 JobManager 和 TaskManager 的 GC
> 时间和次数的监控信息吗?怀疑是不是因为 Full GC 导致的问题。
>
> Best!
> zhisheng
>
从现象看,应该是有内存泄漏,你需要看一下这些内存都是啥,然后才好定位是哪里的问题
checkpoint 是指 state 的一个快照,rocksdb 中存的是 state。理论上来说,作业 fail 了,之前 rocksdb
中的数据就没有了。新的作业是会使用新的 RocksDB
Best,
Congxian
SmileSmile 于2020年7月3日周五 下午2:15写道:
> 作业运行在k8s上,这个现象可以重现,目前我这边有多份数据join的作业基本都会有这个问题。步骤如下:
> 1. 使用eventtime,水位线设置为数据时间-3分钟,状态使用rocks
wse/FLINK-17479
> > Best,
> > Congxian
> >
> >
> > 程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
> >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
> > >
>
m> 于2020年7月1日周三 下午9:09写道:
>
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
> >
> >
> >
> >
> >
> > 在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
> &
作业运行在k8s上,这个现象可以重现,目前我这边有多份数据join的作业基本都会有这个问题。步骤如下:
1. 使用eventtime,水位线设置为数据时间-3分钟,状态使用rocksdb,不开启checkpoint,设置内存limit
2. 作业运行一段时间。
3. kill 其中一个pod,作业fail
4. k8s自动拉起该pod,观察其他pod的内存使用,会上涨。运行一段时间然后很容易超过limit被os kill
5. 陷入被重复kill的死循环。
解决方法:销毁集群,重构即可。
观察过heap的内存,没有问题。 被os kill怀疑是offheap超用,offheap没有正常
你可以看看是否 FLINK-17479[1] 和你的问题一样,是的话,可以尝试修改一下 jdk 版本试试
[1] https://issues.apache.org/jira/browse/FLINK-17479
Best,
Congxian
程龙 <13162790...@163.com> 于2020年7月1日周三 下午9:09写道:
>
>
>
>
>
>
>
>
>
>
>
>
> 都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为
2020年07月03日 11:21,SmileSmile 写道:
>
> Hi
>
> 我的作业是运行在1.10.1, 使用的是event time ,没有开启checkPoint。每当作业重启一次,container memory
> usage会上涨2G,每重启一次就会上涨一些内存直到被OS kill。
>
>
> 历史数据的清理是在新event time到达之后调用 WindowOperator#onEventTime()
> 的clearAllState实现清理,如果作业重启,又没有开启checkpoint,尚未被处理的历史
这种现象只会出现在on rocksdb中。
| |
a511955993
|
|
邮箱:a511955...@163.com
|
签名由 网易邮箱大师 定制
在2020年07月03日 11:21,SmileSmile 写道:
Hi
我的作业是运行在1.10.1, 使用的是event time ,没有开启checkPoint。每当作业重启一次,container memory
usage会上涨2G,每重启一次就会上涨一些内存直到被OS kill。
历史数据的清理是在新event time到达之后调用 WindowOperator#onEventTime()
的
Hi
我的作业是运行在1.10.1, 使用的是event time ,没有开启checkPoint。每当作业重启一次,container memory
usage会上涨2G,每重启一次就会上涨一些内存直到被OS kill。
历史数据的清理是在新event time到达之后调用 WindowOperator#onEventTime()
的clearAllState实现清理,如果作业重启,又没有开启checkpoint,尚未被处理的历史数据是否一直残留在内存中无法清理?
是否有哪位大佬可以帮忙解惑?
| |
a511955993
|
|
邮箱:a511955...@163
都是分配不到资源(slot)的错误,应该还是checkpoint 为空导致的,不知道为啥为空
在 2020-07-01 20:51:34,"JasonLee" <17610775...@163.com> 写道:
>你到具体的tm上找到相关的operator看看是不是有异常信息
>
>
>| |
>JasonLee
>|
>|
>邮箱:17610775...@163.com
>|
>
>Signature is customized by Netease Mai
共有 698 项搜索結果,以下是第 401 - 500 matches
Mail list logo