各位大佬,
背景:
实际测试flink读Kafka 数据写入hudi, checkpoint的间隔时间是1min,
state.backend分别为filesystem,测试结果如下:
写hudi的checkpoint 的延迟
写iceberg得延迟:
疑问: hudi的checkpoint的文件数据比iceberg要大很多,如何降低flink写hudi的checkpoint的延迟?
| |
博星
|
|
15868861...@163.com
|
rocksdb文件正在看~
>
> 在 2024-01-18 10:56:51,"Zakelly Lan" 写道:
>
>> 你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
>> TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
>>
>> On Wed, Jan 17, 2024 at 4:09 PM fufu wrote:
>>
>>>
>>&
HI All
flink 1.18.0 jdk 17 使用异步IO 失败后无法恢复,一直报序列化问题;
我调整使用 string 类型和bytes 都不能够恢复
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(
DefaultOperatorStateB
ize不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~
>
> 在 2024-01-18 10:56:51,"Zakelly Lan" 写道:
>
> >你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
> >TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
> >
>
信息吗,比如:是datastream作业吧?是否设置了State
>TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
>
>On Wed, Jan 17, 2024 at 4:09 PM fufu wrote:
>
>>
>> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么
是datastream作业,窗口算子本身没有设置TTL,其余算子设置了TTL,是在Flink
UI上看到窗口算子的size不断增大,一天能增加个600~800M,持续不断的增大。以下图为例:ID为313的cp比ID为304的大了将近10M,一直运行,会一直这么增加下去。cp文件和rocksdb文件正在看~
在 2024-01-18 10:56:51,"Zakelly Lan" 写道:
>你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
>TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp
你好,能提供一些详细的信息吗,比如:是datastream作业吧?是否设置了State
TTL?观测到逐渐变大是通过checkpoint监控吗,总量是什么级别。cp文件或者本地rocksdb目录下哪些文件最大
On Wed, Jan 17, 2024 at 4:09 PM fufu wrote:
>
> 我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,
我有一个Flink任务,使用的是flink1.14.6版本,任务中有一个增量(AggregateFunction)+全量(ProcessWindowFunction)的窗口,任务运行的时候这个算子的状态在不断增大,每天能增大个几百M这种,这个问题怎么排查?使用的事件时间,水位线下发正常,其余的算子都正常,就这个算子在不断增长,非常诡异。在网上搜到一个类似的文章:https://blog.csdn.net/RL_LEEE/article/details/123864487,想尝试下,但不知道manifest大小如何设置,没有找到对应的参数,
请社区指导下,或者有没有别的解决方案?感谢社区!
看现象是这样,谢了,我抽空看下这块源码
| |
吴先生
|
|
15951914...@163.com
|
回复的原邮件
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月11日 16:33 |
| 收件人 | |
| 主题 | Re: flink-checkpoint 问题 |
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能
看了下代码,这个问题有可能的原因是:
1. flink是先创建chk目录,然后再打 Triggering checkpoint 的 log
的,所以有概率是目录创建了,但是log没输出trigger
2. 作业失败,和触发下一个cp,这是两个异步线程,所以有可能是先执行了创建25548目录的操作然后作业再失败,然后trigger
25548还没输出就退了。
版本1.14.5之后代码已经把上述1行为改了,先打log再创建目录,就不会有这样奇怪的问题了。
On Thu, Jan 11, 2024 at 3:03 PM 吴先生 <15951914...@163.com>
NFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
checkpoint 25546 for job d12f3c6e836f56fb23d96e31737ff0b3 (411347921 bytes in
50128 ms).
2023-12-31 18:40:10.681 [Checkpoint Timer] INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
checkpoint 25547 (type=CHECKPOINT) @ 1704019210665 for
Hi, 你的图挂了,可以用图床处理一下,或者直接贴log。
--
Best!
Xuyang
在 2024-01-11 13:40:43,"吴先生" <15951914...@163.com> 写道:
JM中chk失败时间点日志,没有25548的触发记录:
自动recovery失败:
TM日志:
checkpoint文件路径,25548里面空的:
| |
吴先生
|
|
15951914...@163.com
|
回复的原邮件
| 发件人 | Zakelly Lan |
| 发送日期 |
JM中chk失败时间点日志,没有25548的触发记录:
自动recovery失败:
TM日志:
checkpoint文件路径,25548里面空的:
| |
吴先生
|
|
15951914...@163.com
|
回复的原邮件
| 发件人 | Zakelly Lan |
| 发送日期 | 2024年1月10日 18:20 |
| 收件人 | |
| 主题 | Re: flink-checkpoint 问题 |
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索
On Wed, Jan 10, 2024 at 5:55 PM 吴先
我记得flink低版本有这个bug,会错误的删除某一个checkpoint的,你这个版本太老了,可以升级到新版本。
The following is the content of the forwarded email
From:"吴先生" <15951914...@163.com>
To:user-zh
Date:2024-01-10 17:54:42
Subject:fli
你好,
方便的话贴一下jobmanager的log吧,应该有一些线索
On Wed, Jan 10, 2024 at 5:55 PM 吴先生 <15951914...@163.com> wrote:
> Flink版本: 1.12
> checkpoint配置:hdfs
>
> 现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
>
>
Flink版本: 1.12
checkpoint配置:hdfs
现象:作业由于一些因素第N个checkpoint失败,导致任务重试,任务重试失败,hdfs中不存在第N个chk路径,但是为什么会出现一个第N+1的chk路径,且这个路径下是空的
在使用使用jemalloc内存分配器一段时间后,出现checkpoint
超时,任务卡住的情况,哪位遇到过呢?flink版本:flink-1.13.2,jiemalloc版本:5.3.0
After using the jemalloc memory allocator for a period of time, checkpoint
timeout occurs and tasks are stuck. Who has encountered this? flink
version:1.13.2, jiemalloc version: 5.3.0
问题一:
做checkpoint时,是每个算子收到barriers之后,将状态和offset写到状态后端,并返回ack给jm之后。再做一次全量快照到jm内存或者自己设置的hdfs文件路径下啊。不理解在hdfs生成的checkpoint文件到底是2pc提交事务成功之后的checkpoint还是每个算子做完checkpoint。
是图1:
还是图二:
问题二:
做完2pc之后。出现了故障。做故障恢复。恢复的状态是上一次提交事务成功的地方的状态。还是上一个barriers所在算子做的checkpoint成功的地方开始恢复。
| |
zyzandmz
|
|
zyzan
4. https://issues.apache.org/jira/browse/FLINK-28060
--
Best,
Matt Wang
Replied Message
| From | zhan...@eastcom-sw.com |
| Date | 05/6/2023 09:19 |
| To | user-zh |
| Subject | Re: Re: checkpoint Kafka Offset commit failed |
hi, 感谢解答~
flink 集群跟kafka集群都在同个网段,检查过网络情况是正常的
在
> >
> >
> >
> >
> > 回复的原邮件
> > | 发件人 | zhan...@eastcom-sw.com |
> > | 日期 | 2023年05月04日 14:54 |
> > | 收件人 | user-zh |
> > | 抄送至 | |
> > | 主题 | checkpoint Kafka Offset commit failed |
> > hi,请问在flink(1.14、1.16) checkp
...@163.com
> |
> |
> 邮箱:go574...@163.com
> |
>
>
>
>
> 回复的原邮件
> | 发件人 | zhan...@eastcom-sw.com |
> | 日期 | 2023年05月04日 14:54 |
> | 收件人 | user-zh |
> | 抄送至 | |
> | 主题 | checkpoint Kafka Offset commit failed |
> hi,请问在flink(1.14、1.16) checkpoint
退订
| |
go574...@163.com
|
|
邮箱:go574...@163.com
|
回复的原邮件
| 发件人 | zhan...@eastcom-sw.com |
| 日期 | 2023年05月04日 14:54 |
| 收件人 | user-zh |
| 抄送至 | |
| 主题 | checkpoint Kafka Offset commit failed |
hi,请问在flink(1.14、1.16) checkpoint(10s)提交 kafka偏移量提示 The coordinator is not
available
ager
> 扩容/缩容的时候,Flink会在日志中报错:因为扩缩容之前的TaskManager没有在运行导致checkpoint失败,同时也有checkpoint失败的警报。
> 但实际上checkpoint 还能顺利进行, job也没有运行错误。 重启job后这个错误就会消失。想请教一下如何修复这个问题?
>
> 详细的日志如下
> 2022-12-13 05:08:22.339 [jobmanager-io-thread-1] INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
您好,
我的 Flink job是以 reactive 模式运行,然后用了 Kubernetes HPA 来自动扩容/缩容
TaskManager。每当TaskManager
扩容/缩容的时候,Flink会在日志中报错:因为扩缩容之前的TaskManager没有在运行导致checkpoint失败,同时也有checkpoint失败的警报。
但实际上checkpoint 还能顺利进行, job也没有运行错误。 重启job后这个错误就会消失。想请教一下如何修复这个问题?
详细的日志如下
2022-12-13 05:08:22.339 [jobmanager-io-thread-1] INFO
文件问题,采用了一小时checkpoint,具体参数为:
>
> bsEnv.enableCheckpointing(360);
> bsEnv.setStateBackend(new HashMapStateBackend());
>
> bsEnv.getCheckpointConfig().setCheckpointStorage("hdfs://xxx/user/flink/checkpoint/serverlog/hc");
>
> 但在执行时发现首次checkpoint并没有按照一小时触发.
> 作
flink-oss-fs-hadoop-1.13.6.jar 这个 jar 需要放到 flink 的 lib 目录下
Best,
Lijie
highfei2011 于2022年11月1日周二 16:23写道:
> 包冲突了。
>
>
> 在 2022年11月1日 15:39,highfei2011 写道:
>
>
> flink 版本:apache flink 1.13.6 flink operator 版本: 1.2.0
> 提交命令:kubernetes-jobmanager.sh kubernetes-application 异常: Caused by:
> org.ap
你好. 但是下一次cp发起之时, kafka transaction 已经超时失败了, sink端precommit之前,写入到kafka的数据,
是不是就丢失了?
发件人: Xuyang
发送时间: 2022年11月1日 23:08
收件人: user-zh@flink.apache.org
主题: Re:flink exactly once
写kafka,如果checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
Hi, 应该会等到下一次做
Hi, 应该会等到下一次做cp的时候再提交
在 2022-11-01 17:13:22,"郑 致远" 写道:
>大佬们好.
>flink exactly once 写kafka,如果flink
>checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
>kafka的transaction因为超时,abort后, 会导致abort之前写kafka的数据,丢失吗?
大佬们好.
flink exactly once 写kafka,如果flink
checkpoint超时了导致kafka的transaction在commit之前也超时了, flink会怎么处理呢?
kafka的transaction因为超时,abort后, 会导致abort之前写kafka的数据,丢失吗?
那么生成了 success 文件了吗?
另外你的 sink.partition-commit.trigger 用的是 process-time(默认是 process-time) 还是
partition-time。
Best regards,
Yuxia
- 原始邮件 -
发件人: "junjie miao"
收件人: "user-zh"
发送时间: 星期四, 2022年 9 月 22日 下午 2:27:46
主题: Re: Re: 并行度>1时实时写入hive partition table且开启了checkpoi
你用 hdfs dfs -ls 看一下对应表的路径下,是不是有文件生成。
Best regards,
Yuxia
- 原始邮件 -
发件人: "junjie miao"
收件人: "user-zh"
发送时间: 星期四, 2022年 9 月 22日 下午 1:59:55
主题: 并行度>1时实时写入hive partition table且开启了checkpoint没有同步信息到metastore
flink 1.14.5中消费kafka数据实时写入hive partition text table且开启了chec
022年6月24日周五 12:00写道:
> flink版本:1.13.1
> hdfs:3+版本
> 异常日志:
>
> 2022-06-24 10:58:19,839 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Decline
> checkpoint 1101 by task b3d88f9ef72bda003056856c4422742d of job
> 6bd7dc46451f01e008762c9b556cb08
flink??1.13.1
hdfs??3+
??
2022-06-24 10:58:19,839 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator[] - Decline
checkpoint 1101 by task b3d88f9ef72bda003056856c4422742d of job
6bd7dc46451f01e008762c9b556cb08f at zhaohy4-test-taskmanager-1-1 @ 10.42.5.55
kpoints-after-tasks-finish.enabled相关配置,完美解决了问题。
> 我使用了lookup join + 外部mysql维表,任务开始时,全量加载了一次维表数据,对应task状态就变成了finished。
>
> best wishes!
>
>
> amber_...@qq.com
>
> 发件人: Lincoln Lee
> 发送时间: 2022-06-21 11:18
> 收件人: user-zh
> 主题: Re: Re: 使用join+聚合时,checkpoint异常
> Hi,
非常感谢!你的建议很有用。
我在代码中添加execution.checkpointing.checkpoints-after-tasks-finish.enabled相关配置,完美解决了问题。
我使用了lookup join + 外部mysql维表,任务开始时,全量加载了一次维表数据,对应task状态就变成了finished。
best wishes!
amber_...@qq.com
发件人: Lincoln Lee
发送时间: 2022-06-21 11:18
收件人: user-zh
主题: Re: Re: 使用join+聚合时,checkpoint异常
ith finished tasks is not enabled
Best,
Lincoln Lee
amber_...@qq.com.INVALID 于2022年6月21日周二 10:27写道:
> 感谢!
> 未发生背压,但我在日志中发现了一些异常信息,如下:
> Failed to trigger checkpoint for job 297c5a840f8fd3a1cbcb63825200e8d4
> because Some tasks of the job have already finished and checkpointing with
>
感谢!
未发生背压,但我在日志中发现了一些异常信息,如下:
Failed to trigger checkpoint for job 297c5a840f8fd3a1cbcb63825200e8d4 because
Some tasks of the job have already finished and checkpointing with finished
tasks is not enabled. Failure reason: Not all required tasks are currently
running.
通过web ui可以看到,确实有一部分任务是
你好,图片挂了,可以尝试使用图床工具上传图片。
在 2022-06-21 09:42:54,"amber_...@qq.com.INVALID" 写道:
您好!
我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
当我提交普通数据同步任务时,一切正常;
当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
Memory使用率始终是100%;
以下是我的ch
hi.
这种情况下,最好查看一下是否发生了反压,同时看看日志之中是否有相关的异常信息。
Best,
Shengkai
amber_...@qq.com.INVALID 于2022年6月21日周二 09:43写道:
> 您好!
> 我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
> 当我提交普通数据同步任务时,一切正常;
> 当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
>
您好!
我使用flink1.14.4,sqlserver-cdc-2.2.1,yarn-per-job模式提交任务;
当我提交普通数据同步任务时,一切正常;
当我提交JOIN+聚合任务时,checkpoint无法正常工作,具体表现为无任何checkpoint记录,且Task Managed
Memory使用率始终是100%;
以下是我的checkpoint配置:
我尝试增加Task Managed内存,但使用率总是100%;
当我关闭增量检查点时,无任何变化;
当我将State Backend切换为hashmap时
你好,
Unaligned checkpoint 是个底层特性,要使用的话只要设置 Flink 参数
execution.checkpointing.unaligned = true 就行,在 SQL client 中,可以使用 SET "key" =
"value" 的语法设置 Flink 参数的值。
Unaligned checkpoint 较之 aligned checkpoint 主要的改变在于
* unaligned cp 在输入缓冲区收到第一个 cp barrier
的时候立即触发快照并直接输出至下游;代价是快照需要
大佬们可以说说Unaligned Checkpoint的实现吗 看了不少文档 没有太看懂 我如果想在sql里面实现 这个该怎么设置啊 请大佬们指教
| |
小昌同学
|
|
ccc0606fight...@163.com
|
使用oss 存储checkpoint,做几次checkpoint就会出现下面报错,导致checkpoint失败
Caused by: org.apache.flink.fs.osshadoop.shaded.com.aliyun.oss.ClientException:
Connection error due to: Trying to access closed classloader. Please check if
you store classloaders directly or indirectly in static fields. If the
打扰了,解决了,原因是因为启动时没有配置savepoint路径。
> 2022年4月30日 12:09,Arthur Li 写道:
>
> 大家好,
>
>
> 我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
> 1. 启动checkpoint
> 2. 设置statebackend为FsStateBackend
> 3. 从socketTextStream读取数据,统计单词个数
>(“hello”, 5), (“world”, 1)
> 4.
大家好,
我在学习Flink checkpoint时,做了一个示例没有得到期望结果,麻烦帮忙看看是哪里设置有问题。谢谢
1. 启动checkpoint
2. 设置statebackend为FsStateBackend
3. 从socketTextStream读取数据,统计单词个数
(“hello”, 5), (“world”, 1)
4. 通过触发异常,来模拟终止程序
5. 重新启动程序,那么启动之后的统计数据的初始值应该是上一次checkpoint 成功存储的值
(“hello”, 5), (“world”, 1) , 那么再次输入hello, 应该输出(“hello
Hi
一般是卡在最后一步从JM写checkpoint meta上面了,建议使用jstack等工具检查一下JM的cpu栈,看问题出在哪里。
祝好
唐云
From: Sun.Zhu <17626017...@163.com>
Sent: Tuesday, March 8, 2022 14:12
To: user-zh@flink.apache.org
Subject: Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败
图挂了
https://post
图挂了
https://postimg.cc/Z9XdxwSk
在 2022-03-08 14:05:39,"Sun.Zhu" <17626017...@163.com> 写道:
hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?
hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?
Hi, kong:
对于Checkpoint超时,建议调大反压处下游算子的并发度,避免出现长时间反压。
在配置方面,如果希望调大checkpoint超时时间,可以将execution.checkpointing.timeout配置项 [1]
的值调大,目前默认值为"10
min",即10分钟会超时。此外还可以调大execution.checkpointing.tolerable-failed-checkpoints配置项
[2] 的值,容忍更多的checkpoint失败。
这些参数项只能减少checkpoint失败导致failover的次数,最佳方案是从作业入手,
你好,我检查了下关于checkpoint的文档:https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/
<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/checkpointing/>
tolerable checkpoint failure number: This defines ho
你好,checkpoint超时默认不会导致作业重启,可以提供下JM log看看作业为什么会重启吗?
> On 3 Mar 2022, at 9:15 PM, kong <62...@163.com> wrote:
>
> hello,我最近遇到一个问题:
> 我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map ->
> Sink
> 在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,
hello,我最近遇到一个问题:
我通过flink消费kafka数据,job 图大概是这样的:Source -> map -> filter -> flatMap -> Map -> Sink
在一瞬间kafka的producer端会产生大量的数据,导致flink无法消费完,我的checkpoint设置的是10分钟;
最后会产生Checkpoint expired before
completing.的错误,导致job重启,从而导致从上一个checkpoint恢复,然后重复消费数据,又导致checkpoint超时,死循环了。
不知道有什么好办法解决该问题。
多谢~
Hi,
有考慮升級 1.14 嗎?Flink 1.14 支持了 FLIP-147,讓 Flink 在 task 為 finished 狀態時仍能觸發
checkpoint [1, 2]。
[1]
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks
1. 图片挂了,看不到,尽量用文字,或者用图床等工具
2. 启动任务有配置checkpoint吗?
在 2022-02-17 11:40:04,"董少杰" 写道:
flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
flink版本1.12.2。
谢谢!
| |
董少杰
|
|
eric21...@163.com
|
flink读取csv文件建表,同时消费kafka数据建表,两张表join之后写入hdfs(hudi),读取csv数据的任务已经是finished状态,就会触发不了checkpoint,看有什么办法能让它正常触发checkpoint?
flink版本1.12.2。
谢谢!
| |
董少杰
|
|
eric21...@163.com
|
直接用的开源版本吗?还是公司内部有改动,原生的cp是固定频率,而很多公司离线计算都是整点触发的,为了减少延迟,会自定义在整点触发一次cp,开源目前没有这个feature
黄志高 于2021年12月1日周三 21:53写道:
> hi,各位大佬,咨询个问题
>
>
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder
; COMPLETED
> | 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
> 上图是checkpoint
>
>
> 这个是在11月30号0时段生成的文件
> 2021-11-30 00:00:011080827 athena_other-0-217891.gz
> 2021-11-30 00:02:424309209 athena_other-0-217892.gz
> 2021-11-30 00:12:403902474 athena_other-0-217893.
Hi!
图片无法显示,建议使用外部图床上传。
checkpoint 慢的原因可能有很多,最可能的原因是由于算子处理数据太慢导致反压(可以通过 Flink web UI 每个节点的 busy
百分比大致看出来)。建议检查资源是否充足,数据是否倾斜,gc 是否过于频繁等。
紫月幽魔灵 于2021年12月28日周二 10:38写道:
> 版本:flink版本1.14.0
> 问题: 使用flink 1.14.0版本提交到jdk1.7版本的yarn集群上checkpoint无法生成,一直处于IN_PROGRESS状态
> 提交命令如下:
> ./
:flink1.14.0
: flink
1.14.0??jdk1.7??yarn??checkpoint,IN_PROGRESS
:
./bin/flinksql-submit.sh \
--sql sqlserver-cdc-to-kafka.sql \
-m yarn-cluster \
-ynm sqlserverTOkafka \
-ys 2 \
-yjm 1024 \
-ytm 1024 \
-yid
|
| | 32680 |
COMPLETED
| 8/8 | 13:12:36 | 13:12:39 | 2s | 125 KB | 0 B |
| | 32679 |
COMPLETED
| 8/8 | 13:02:36 | 13:02:41 | 4s | 214 KB | 0 B |
上图是checkpoint
这个是在11月30号0时段生成的文件
2021-11-30 00:00:011080827 athena_other-0-217891.gz
2021-11-30 00:02:424309209 athena_other-0-217892.gz
2021-11-30 00:12
Hi,我把文件放到下面的,文件在checkpoint可见我是理解的,但是文件的生成时间应该是在checkpoint以后是正常的,但是我却在每个整点时段看见数据文件,如下图所示,按理说文件的生成都是在checkpoint之后的,也就是2分,12,22,32,42,52分后,而每个00分都会生成一个数据文件,不理解这个文件怎么生成的,内部的滚动策略是OnCheckpointRollingPolicy
在 2021-12-02 11:37:31,"Caizhi Weng" 写道:
>Hi!
>
>邮
Hi!
邮件里看不到图片和附件,建议使用外部图床。
partFile 文件是不是以英文句点开头的?这是因为 streamingFileSink 写文件的时候还没做 checkpoint,为了保证
exactly once,这些临时写下的 .partFile 文件都是不可见的,需要等 checkpoint 之后才会重命名成可见的文件。
黄志高 于2021年12月1日周三 下午9:53写道:
> hi,各位大佬,咨询个问题
>
>
> 我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间
hi,各位大佬,咨询个问题
我的Flink版本是1.11.0,我的程序是从kafka->s3,checkpoint的时间间隔是10分钟,程序中间不做任何操作,直接消费数据落到文件系统,使用的是streamingFileSink,用的是内部的bulkFormatbuilder,通过源码分析采用的滚动策略是onCheckpointRollingPolicy,但是我发现在每个小时间生成一个bucket,都会在整点的时间生成一个partFile文件,而我的checkpoint触发的时间点都是02分,12分,22分,32分,42分,52分,对应的文件生成时间也是这个时候,但是总是会在整
Hi,All~!
麻烦大家一个问题,有大佬了解过checkpoint文件大小与实际内存对应的状态数据大小的映射关系吗?
比如Fs状态后端checkpoint后文件大小是1MB,对应的状态数据在内存中占用大概是多少呢?
感谢答复~!
Hi!
checkpoint 超时有很多可能性。最常见的原因是超时的节点太忙阻塞了 checkpoint(包括计算资源不足,或者数据有倾斜等),这可以通过看
Flink web UI 上的 busy 以及反压信息判断;另外一个常见原因是 gc 太频繁,可以通过设置 jvm 参数打印出 gc log 观察。
yu...@kiscloud.net 于2021年11月18日周四 下午2:54写道:
> flink的job运行一段时间后, checkpoint就一直失败,信息如下:
> ID
> Status
> Acknowledged
> Trigge
flink的job运行一段时间后, checkpoint就一直失败,信息如下:
ID
Status
Acknowledged
Trigger Time
Latest Acknowledgement
End to End Duration
State Size
Buffered During Alignment
295
FAILED
30/5011:55:3811:55:391h 0m 0s205 KB0 B
Checkpoint Detail:
Path: - Discarded: - Failure Message: Checkpoint expired before
:
flink on yarn ??flink
hdfs,ark1??hdfs??active??standby
ark2standbyactive
:??flink??checkpoint??hdfs??url??hdfs:ark:8082
,standby??,
hdfs??mycluster
:
flink on yarn ??flink
hdfs,ark1??hdfs??active??standby
ark2standbyactive
:??flink??checkpoint??hdfs??url??hdfs:ark:8082
,standby??,
??
的 KafkaSource,可以配置 enable.auto.commit = true 和
> auto.commit.interval.ms = {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交
> offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在
> checkpoint 时提交位点。
> >
> >--
> >Best Regards,
> >
> >Qingsheng Re
--
> 发件人:
> "user-zh"
> <
> hilili...@gmail.com>;
> 发送时间: 2021年10月30日(星期六) 晚上10:58
> 收件人: "user-zh"
> 主题: Re: 关于作业失败从checkpoint重启,触发了过期的窗口计算
>
continue trigger
-- --
??:
"user-zh"
可以试试添加使用Continuou Trigger
Yun Tang 于2021年10月29日周五 下午5:56写道:
> Hi,
>
> 先问个版本问题,你的Flink版本是1.3 而不是1.13?
>
>
> Checkpoint里面会存储timer,所以重启之后会触发窗口的计算,但确实这种一天的窗口累计有点太多了,除非你的作业存在比较严重的反压,导致checkpoint内积攒了大量没有触发的timer。
>
> 祝好
> 唐云
>
>
增量checkpoint是可以恢复作业的。
Flink 的增量 checkpoint 以 RocksDB 的 checkpoint 为基础。RocksDB 是一个 LSM 结构的 KV
> 数据库,把所有的修改保存在内存的可变缓存中(称为 memtable),所有对 memtable 中 key 的修改,会覆盖之前的 value,当前
> memtable 满了之后,RocksDB 会将所有数据以有序的写到磁盘。当 RocksDB 将 memtable
> 写到磁盘后,整个文件就不再可变,称为有序字符串表(sstable)。
> RocksDB 的后台压缩线程会将
Hi
可以使用jstack,async profiler [1]
等工具勘察一下checkpoint期间的CPU栈。oss需要先写本地再上传,确实可能CPU消耗多一些,但是明显高很多有一些超出预期。
[1] https://github.com/jvm-profiling-tools/async-profiler
祝好
唐云
From: Lei Wang
Sent: Tuesday, October 19, 2021 14:01
To: user-zh@flink.apache.org
Subject: Re
Hi,
先问个版本问题,你的Flink版本是1.3 而不是1.13?
Checkpoint里面会存储timer,所以重启之后会触发窗口的计算,但确实这种一天的窗口累计有点太多了,除非你的作业存在比较严重的反压,导致checkpoint内积攒了大量没有触发的timer。
祝好
唐云
From: claylin <1012539...@qq.com.INVALID>
Sent: Friday, October 29, 2021 11:33
To: user-zh
Subject: 关于作业失败从checkp
checkpointflink1.3/1.4+sql??checkpoint1??rocksdb15??30
增量checkpoint是否可以用来恢复flink作业?
增量checkpoint我理解是有一个base checkpoint + 若干个delta checkpoint
(中间会做一次全量checkpoint以截断过长的血缘吗?),恢复的时候需要从base checkpoint开始一个个按时间顺序应用delta
checkpoint。
按这样的话,每个delta
checkpoint都需要保留才可以恢复状态,但现实并不是所有checkpoint都保留,所以我觉得增量checkpoint是不能用来恢复flink作业的,这样理解对吗?
= {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交
>offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在
>checkpoint 时提交位点。
>
>--
>Best Regards,
>
>Qingsheng Ren
>Email: renqs...@gmail.com
>On Oct 27, 2021, 4:59 PM +0800, 杨浩 , wrote:
>> 请问有办法和现有监控兼容么?开启c
你好!
如果使用的是基于 FLIP-27 实现的 KafkaSource,可以配置 enable.auto.commit = true 和
auto.commit.interval.ms = {commit_interval} 使 KafkaSource 按照指定的时间间隔自动提交
offset。基于 SourceFunction 的 FlinkKafkaConsumer 在 checkpoint 开启时不支持自动提交,只能在
checkpoint 时提交位点。
--
Best Regards,
Qingsheng Ren
Email: renqs...@gmail.com
请问有办法和现有监控兼容么?开启checkpoint时,让消费组的offset实时更新
在 2021-10-25 21:58:28,"杨浩" 写道:
>currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
>在 2021-10-25 10:31:12,"Caizhi Weng" 写道:
>>Hi!
>>
>>这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoin
currentOffsets理论上OK,但是这边云上监控系统中的kafka未消费量使用的是committedOffsets
在 2021-10-25 10:31:12,"Caizhi Weng" 写道:
>Hi!
>
>这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
>metrics 读取,见 [1]。
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-master/doc
Hi!
这里的 offset 是 kafka source 的 offset 吗?其实没必要通过 checkpoint 读取 offset,可以通过
metrics 读取,见 [1]。
[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#kafka-connectors
杨浩 于2021年10月25日周一 上午10:20写道:
> 请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度
请问下,如果启用checkpoint,因为状态比较大,checkpoint间隔设置比较大,如何让offset提交的比较快,这样方便监控程序进度
hello 这个报错看上去并不是状态不兼容的报错。 我看代码 Sink 算子设置了uid 理论上是可以正确恢复的。
kong <62...@163.com> 于2021年10月21日周四 上午10:26写道:
> hi,我遇到flink修改sink并行度后,无法从checkpoint restore问题
>
>
> flink 版本: 1.13.1
> flink on yarn
> DataStream api方式写的java job
>
>
> 试验1:不修改任何代码,cancel job后,能从指定的checkpo
hi,我遇到flink修改sink并行度后,无法从checkpoint restore问题
flink 版本: 1.13.1
flink on yarn
DataStream api方式写的java job
试验1:不修改任何代码,cancel job后,能从指定的checkpoint恢复
dataStream.addSink(new Sink(config)).name("").uid("");
试验2:只修改sink端的并行度,job无法启动,一直是Initiating状态
dataStre
确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G
On Mon, Oct 18, 2021 at 10:44 AM Michael Ran wrote:
> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34
应该和OSS没关系吧,毕竟只是个存储。
我们CPU 你先看看消耗在哪个线程或者方法类呗
在 2021-10-08 16:34:47,"Lei Wang" 写道:
flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。
我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
这个可能的原因是什么?会跟 OSS 有关吗?
谢谢,
王磊
图片上传到附件中了
在 2021-10-12 10:33:12,"李一飞" 写道:
异步IO算子无法完成checkpoint,帮忙看下是什么原因
Hi!
图片无法在邮件中显示,请检查。
李一飞 于2021年10月12日周二 上午10:33写道:
> 异步IO算子无法完成checkpoint,帮忙看下是什么原因
>
>
>
>
异步IO算子无法完成checkpoint,帮忙看下是什么原因
flink 程序以 RocksDB 作为 stateBackend, aliyun OSS 作为 checkpoint 数据最终的物理位置。
我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
[image: image.png]
这个可能的原因是什么?会跟 OSS 有关吗?
谢谢,
王磊
checkpoint的状态大约只有50M左右就会开始出现cp失败的问题。如果失败了,尝试停止任务生成savepoint基本也不能成功。但同时运行的其他任务,cp在300M左右,
save point 1G左右的就很顺利,基本不会出问题。
因为实际的数据压力并不是很大,如果单纯增加并行度,是否能在窗口多的情况下有比较明显的改善呢?
Caizhi Weng 于2021年9月22日周三 上午11:27写道:
> Hi!
>
> 24 小时且步长 1 分钟的 window 会由于数据不断累积而导致 cp 越来越大,越来越慢,最终超时。当然如果运算太慢导致
og (
> devid,
> ip,
> op_ts
> ) with (
> connector = kafka
> )
>
> sink: Hbase connect 2.2
>
> 目前用flink sql的hop
> window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
> 执行SQL如下
> insert into h_table
> select
> devid as rowkey
> row(hop_end, ip
FLink:1.12.1
源: kafka
create table dev_log (
devid,
ip,
op_ts
) with (
connector = kafka
)
sink: Hbase connect 2.2
目前用flink sql的hop window开发一个指标,统计近24小时的设备关联ip数。设置30min一次checkpoint,超时时间30min。
执行SQL如下
insert into h_table
select
devid as rowkey
row(hop_end, ip_cnt)
from (
select
devid
Hi
從代碼上來看是使用了 regular join 關聯了 kafka source 和 hbase source,hbase connector
目前是不支持流式數據源的
你可以從任務儀表板上確認下提交的任務,hbase source 的部分應該在執行一段時間後狀態會變更為 FINISHED,目前 flink
checkpoint 還不支持在 FINISHED task 上執行
你可以考慮改寫 sql 使用 processing time temporal join [1] 的方式來關聯 hbase table,從 kafka
消費的數據會實時的去查 hbase table 的當前
没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);
On 2021/09/10 07:41:10, "xia_...@163.com" wrote:
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请
没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);
On 2021/09/10 07:41:10, "xia_...@163.com" wrote:
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请
没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);
On 2021/09/10 07:41:10, "xia_...@163.com" wrote:
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请
没有开启checkpoint
execEnv.enableCheckpointing(checkpointInterval);
On 2021/09/10 07:41:10, "xia_...@163.com" wrote:
> Hi:
> 有个问题想请教一下大佬们:正在研究流上join操作,使用FlinkKafkaConsume
> 消费kafka数据作为数据源,随后关联hbase维度表数据,可以成功关联,但是KafkaSource缺始终没有进行checkpoint,代码中是有设置checkpint的,我想请
共有 742 项搜索結果,以下是第 1 - 100 matches
Mail list logo